Skip to content

Codegen examples

Animesh Trivedi edited this page Jun 16, 2017 · 4 revisions

An example of parquet DataScanExec (I am guessing)

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator scan_input;
/* 009 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 010 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 011 */   private long scan_scanTime1;
/* 012 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 013 */   private int scan_batchIdx;
/* 014 */   private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance0;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnVector scan_colInstance1;
/* 016 */   private UnsafeRow scan_result;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder scan_holder;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter scan_rowWriter;
/* 019 */
/* 020 */   public GeneratedIterator(Object[] references) {
/* 021 */     this.references = references;
/* 022 */   }
/* 023 */
/* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */     partitionIndex = index;
/* 026 */     this.inputs = inputs;
/* 027 */     scan_input = inputs[0];
/* 028 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 029 */     this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 030 */     scan_scanTime1 = 0;
/* 031 */     scan_batch = null;
/* 032 */     scan_batchIdx = 0;
/* 033 */     scan_colInstance0 = null;
/* 034 */     scan_colInstance1 = null;
/* 035 */     scan_result = new UnsafeRow(2);
/* 036 */     this.scan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 32);
/* 037 */     this.scan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 2);
/* 038 */
/* 039 */   }
/* 040 */
/* 041 */   private void scan_nextBatch() throws java.io.IOException {
/* 042 */     long getBatchStart = System.nanoTime();
/* 043 */     if (scan_input.hasNext()) {
/* 044 */       scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 045 */       scan_numOutputRows.add(scan_batch.numRows());
/* 046 */       scan_batchIdx = 0;
/* 047 */       scan_colInstance0 = scan_batch.column(0);
/* 048 */       scan_colInstance1 = scan_batch.column(1);
/* 049 */
/* 050 */     }
/* 051 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 052 */   }
/* 053 */
/* 054 */   protected void processNext() throws java.io.IOException {
/* 055 */     if (scan_batch == null) {
/* 056 */       scan_nextBatch();
/* 057 */     }
/* 058 */     while (scan_batch != null) {
/* 059 */       int numRows = scan_batch.numRows();
/* 060 */       while (scan_batchIdx < numRows) {
/* 061 */         int scan_rowIdx = scan_batchIdx++;
/* 062 */         boolean scan_isNull = scan_colInstance0.isNullAt(scan_rowIdx);
/* 063 */         int scan_value = scan_isNull ? -1 : (scan_colInstance0.getInt(scan_rowIdx));
/* 064 */         boolean scan_isNull1 = scan_colInstance1.isNullAt(scan_rowIdx);
/* 065 */         byte[] scan_value1 = scan_isNull1 ? null : (scan_colInstance1.getBinary(scan_rowIdx));
/* 066 */         scan_holder.reset();
/* 067 */
/* 068 */         scan_rowWriter.zeroOutNullBytes();
/* 069 */
/* 070 */         if (scan_isNull) {
/* 071 */           scan_rowWriter.setNullAt(0);
/* 072 */         } else {
/* 073 */           scan_rowWriter.write(0, scan_value);
/* 074 */         }
/* 075 */
/* 076 */         if (scan_isNull1) {
/* 077 */           scan_rowWriter.setNullAt(1);
/* 078 */         } else {
/* 079 */           scan_rowWriter.write(1, scan_value1);
/* 080 */         }
/* 081 */         scan_result.setTotalSize(scan_holder.totalSize());
/* 082 */         append(scan_result);
/* 083 */         if (shouldStop()) return;
/* 084 */       }
/* 085 */       scan_batch = null;
/* 086 */       scan_nextBatch();
/* 087 */     }
/* 088 */     scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 089 */     scan_scanTime1 = 0;
/* 090 */   }
/* 091 */ }

An Example Code for doing join

This is yet to be located conclusively. I suspect SortMergeJoinExec.

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator smj_leftInput;
/* 009 */   private scala.collection.Iterator smj_rightInput;
/* 010 */   private InternalRow smj_leftRow;
/* 011 */   private InternalRow smj_rightRow;
/* 012 */   private int smj_value4;
/* 013 */   private java.util.ArrayList smj_matches;
/* 014 */   private int smj_value5;
/* 015 */   private InternalRow smj_value6;
/* 016 */   private org.apache.spark.sql.execution.metric.SQLMetric smj_numOutputRows;
/* 017 */   private UnsafeRow smj_result;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder smj_holder;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter1;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter smj_rowWriter2;
/* 022 */
/* 023 */   public GeneratedIterator(Object[] references) {
/* 024 */     this.references = references;
/* 025 */   }
/* 026 */
/* 027 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */     partitionIndex = index;
/* 029 */     this.inputs = inputs;
/* 030 */     smj_leftInput = inputs[0];
/* 031 */     smj_rightInput = inputs[1];
/* 032 */
/* 033 */     smj_rightRow = null;
/* 034 */
/* 035 */     smj_matches = new java.util.ArrayList();
/* 036 */
/* 037 */     this.smj_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 038 */     smj_result = new UnsafeRow(2);
/* 039 */     this.smj_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_result, 64);
/* 040 */     this.smj_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_holder, 2);
/* 041 */     this.smj_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_holder, 5);
/* 042 */     this.smj_rowWriter2 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_holder, 5);
/* 043 */
/* 044 */   }
/* 045 */
/* 046 */   private boolean findNextInnerJoinRows(
/* 047 */     scala.collection.Iterator leftIter,
/* 048 */     scala.collection.Iterator rightIter) {
/* 049 */     smj_leftRow = null;
/* 050 */     int comp = 0;
/* 051 */     while (smj_leftRow == null) {
/* 052 */       if (!leftIter.hasNext()) return false;
/* 053 */       smj_leftRow = (InternalRow) leftIter.next();
/* 054 */
/* 055 */       InternalRow smj_value1 = smj_leftRow.getStruct(0, 5);
/* 056 */       boolean smj_isNull = false;
/* 057 */       int smj_value = -1;
/* 058 */
/* 059 */       if (smj_value1.isNullAt(0)) {
/* 060 */         smj_isNull = true;
/* 061 */       } else {
/* 062 */         smj_value = smj_value1.getInt(0);
/* 063 */       }
/* 064 */       if (smj_isNull) {
/* 065 */         smj_leftRow = null;
/* 066 */         continue;
/* 067 */       }
/* 068 */       if (!smj_matches.isEmpty()) {
/* 069 */         comp = 0;
/* 070 */         if (comp == 0) {
/* 071 */           comp = (smj_value > smj_value5 ? 1 : smj_value < smj_value5 ? -1 : 0);
/* 072 */         }
/* 073 */
/* 074 */         if (comp == 0) {
/* 075 */           return true;
/* 076 */         }
/* 077 */         smj_matches.clear();
/* 078 */       }
/* 079 */
/* 080 */       do {
/* 081 */         if (smj_rightRow == null) {
/* 082 */           if (!rightIter.hasNext()) {
/* 083 */             smj_value5 = smj_value;
/* 084 */             return !smj_matches.isEmpty();
/* 085 */           }
/* 086 */           smj_rightRow = (InternalRow) rightIter.next();
/* 087 */
/* 088 */           InternalRow smj_value3 = smj_rightRow.getStruct(0, 5);
/* 089 */           boolean smj_isNull2 = false;
/* 090 */           int smj_value2 = -1;
/* 091 */
/* 092 */           if (smj_value3.isNullAt(0)) {
/* 093 */             smj_isNull2 = true;
/* 094 */           } else {
/* 095 */             smj_value2 = smj_value3.getInt(0);
/* 096 */           }
/* 097 */           if (smj_isNull2) {
/* 098 */             smj_rightRow = null;
/* 099 */             continue;
/* 100 */           }
/* 101 */           smj_value4 = smj_value2;
/* 102 */         }
/* 103 */
/* 104 */         comp = 0;
/* 105 */         if (comp == 0) {
/* 106 */           comp = (smj_value > smj_value4 ? 1 : smj_value < smj_value4 ? -1 : 0);
/* 107 */         }
/* 108 */
/* 109 */         if (comp > 0) {
/* 110 */           smj_rightRow = null;
/* 111 */         } else if (comp < 0) {
/* 112 */           if (!smj_matches.isEmpty()) {
/* 113 */             smj_value5 = smj_value;
/* 114 */             return true;
/* 115 */           }
/* 116 */           smj_leftRow = null;
/* 117 */         } else {
/* 118 */           smj_matches.add(smj_rightRow.copy());
/* 119 */           smj_rightRow = null;;
/* 120 */         }
/* 121 */       } while (smj_leftRow != null);
/* 122 */     }
/* 123 */     return false; // unreachable
/* 124 */   }
/* 125 */
/* 126 */   protected void processNext() throws java.io.IOException {
/* 127 */     while (findNextInnerJoinRows(smj_leftInput, smj_rightInput)) {
/* 128 */       int smj_size = smj_matches.size();
/* 129 */       smj_value6 = smj_leftRow.getStruct(0, 5);
/* 130 */       for (int smj_i = 0; smj_i < smj_size; smj_i ++) {
/* 131 */         InternalRow smj_rightRow1 = (InternalRow) smj_matches.get(smj_i);
/* 132 */
/* 133 */         smj_numOutputRows.add(1);
/* 134 */
/* 135 */         InternalRow smj_value7 = smj_rightRow1.getStruct(0, 5);
/* 136 */         smj_holder.reset();
/* 137 */
/* 138 */         // Remember the current cursor so that we can calculate how many bytes are
/* 139 */         // written later.
/* 140 */         final int smj_tmpCursor = smj_holder.cursor;
/* 141 */
/* 142 */         if (smj_value6 instanceof UnsafeRow) {
/* 143 */           final int smj_sizeInBytes = ((UnsafeRow) smj_value6).getSizeInBytes();
/* 144 */           // grow the global buffer before writing data.
/* 145 */           smj_holder.grow(smj_sizeInBytes);
/* 146 */           ((UnsafeRow) smj_value6).writeToMemory(smj_holder.buffer, smj_holder.cursor);
/* 147 */           smj_holder.cursor += smj_sizeInBytes;
/* 148 */
/* 149 */         } else {
/* 150 */           smj_rowWriter1.reset();
/* 151 */
/* 152 */           final int smj_fieldName = smj_value6.getInt(0);
/* 153 */           if (smj_value6.isNullAt(0)) {
/* 154 */             smj_rowWriter1.setNullAt(0);
/* 155 */           } else {
/* 156 */             smj_rowWriter1.write(0, smj_fieldName);
/* 157 */           }
/* 158 */
/* 159 */           final long smj_fieldName1 = smj_value6.getLong(1);
/* 160 */           if (smj_value6.isNullAt(1)) {
/* 161 */             smj_rowWriter1.setNullAt(1);
/* 162 */           } else {
/* 163 */             smj_rowWriter1.write(1, smj_fieldName1);
/* 164 */           }
/* 165 */
/* 166 */           final double smj_fieldName2 = smj_value6.getDouble(2);
/* 167 */           if (smj_value6.isNullAt(2)) {
/* 168 */             smj_rowWriter1.setNullAt(2);
/* 169 */           } else {
/* 170 */             smj_rowWriter1.write(2, smj_fieldName2);
/* 171 */           }
/* 172 */
/* 173 */           final float smj_fieldName3 = smj_value6.getFloat(3);
/* 174 */           if (smj_value6.isNullAt(3)) {
/* 175 */             smj_rowWriter1.setNullAt(3);
/* 176 */           } else {
/* 177 */             smj_rowWriter1.write(3, smj_fieldName3);
/* 178 */           }
/* 179 */
/* 180 */           final UTF8String smj_fieldName4 = smj_value6.getUTF8String(4);
/* 181 */           if (smj_value6.isNullAt(4)) {
/* 182 */             smj_rowWriter1.setNullAt(4);
/* 183 */           } else {
/* 184 */             smj_rowWriter1.write(4, smj_fieldName4);
/* 185 */           }
/* 186 */         }
/* 187 */
/* 188 */         smj_rowWriter.setOffsetAndSize(0, smj_tmpCursor, smj_holder.cursor - smj_tmpCursor);
/* 189 */
/* 190 */         // Remember the current cursor so that we can calculate how many bytes are
/* 191 */         // written later.
/* 192 */         final int smj_tmpCursor6 = smj_holder.cursor;
/* 193 */
/* 194 */         if (smj_value7 instanceof UnsafeRow) {
/* 195 */           final int smj_sizeInBytes1 = ((UnsafeRow) smj_value7).getSizeInBytes();
/* 196 */           // grow the global buffer before writing data.
/* 197 */           smj_holder.grow(smj_sizeInBytes1);
/* 198 */           ((UnsafeRow) smj_value7).writeToMemory(smj_holder.buffer, smj_holder.cursor);
/* 199 */           smj_holder.cursor += smj_sizeInBytes1;
/* 200 */
/* 201 */         } else {
/* 202 */           smj_rowWriter2.reset();
/* 203 */
/* 204 */           final int smj_fieldName5 = smj_value7.getInt(0);
/* 205 */           if (smj_value7.isNullAt(0)) {
/* 206 */             smj_rowWriter2.setNullAt(0);
/* 207 */           } else {
/* 208 */             smj_rowWriter2.write(0, smj_fieldName5);
/* 209 */           }
/* 210 */
/* 211 */           final long smj_fieldName6 = smj_value7.getLong(1);
/* 212 */           if (smj_value7.isNullAt(1)) {
/* 213 */             smj_rowWriter2.setNullAt(1);
/* 214 */           } else {
/* 215 */             smj_rowWriter2.write(1, smj_fieldName6);
/* 216 */           }
/* 217 */
/* 218 */           final double smj_fieldName7 = smj_value7.getDouble(2);
/* 219 */           if (smj_value7.isNullAt(2)) {
/* 220 */             smj_rowWriter2.setNullAt(2);
/* 221 */           } else {
/* 222 */             smj_rowWriter2.write(2, smj_fieldName7);
/* 223 */           }
/* 224 */
/* 225 */           final float smj_fieldName8 = smj_value7.getFloat(3);
/* 226 */           if (smj_value7.isNullAt(3)) {
/* 227 */             smj_rowWriter2.setNullAt(3);
/* 228 */           } else {
/* 229 */             smj_rowWriter2.write(3, smj_fieldName8);
/* 230 */           }
/* 231 */
/* 232 */           final UTF8String smj_fieldName9 = smj_value7.getUTF8String(4);
/* 233 */           if (smj_value7.isNullAt(4)) {
/* 234 */             smj_rowWriter2.setNullAt(4);
/* 235 */           } else {
/* 236 */             smj_rowWriter2.write(4, smj_fieldName9);
/* 237 */           }
/* 238 */         }
/* 239 */
/* 240 */         smj_rowWriter.setOffsetAndSize(1, smj_tmpCursor6, smj_holder.cursor - smj_tmpCursor6);
/* 241 */         smj_result.setTotalSize(smj_holder.totalSize());
/* 242 */         append(smj_result.copy());
/* 243 */
/* 244 */       }
/* 245 */       if (shouldStop()) return;
/* 246 */     }
/* 247 */   }
/* 248 */ }

SortExec

This is where the shuffle iterator is consumed.

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean sort_needToSort;
/* 009 */   private org.apache.spark.sql.execution.SortExec sort_plan;
/* 010 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 011 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 012 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 013 */   private scala.collection.Iterator inputadapter_input;
/* 014 */   private org.apache.spark.sql.execution.metric.SQLMetric sort_peakMemory;
/* 015 */   private org.apache.spark.sql.execution.metric.SQLMetric sort_spillSize;
/* 016 */   private org.apache.spark.sql.execution.metric.SQLMetric sort_sortTime;
/* 017 */
/* 018 */   public GeneratedIterator(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */     sort_needToSort = true;
/* 026 */     this.sort_plan = (org.apache.spark.sql.execution.SortExec) references[0];
/* 027 */     sort_sorter = sort_plan.createSorter();
/* 028 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 029 */
/* 030 */     inputadapter_input = inputs[0];
/* 031 */     this.sort_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 032 */     this.sort_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 033 */     this.sort_sortTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 034 */
/* 035 */   }
/* 036 */
/* 037 */   private void sort_addToSorter() throws java.io.IOException {
/* 038 */     while (inputadapter_input.hasNext()) {
/* 039 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 040 */       sort_sorter.insertRow((UnsafeRow)inputadapter_row);
/* 041 */       if (shouldStop()) return;
/* 042 */     }
/* 043 */
/* 044 */   }
/* 045 */
/* 046 */   protected void processNext() throws java.io.IOException {
/* 047 */     if (sort_needToSort) {
/* 048 */       long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 049 */       sort_addToSorter();
/* 050 */       sort_sortedIter = sort_sorter.sort();
/* 051 */       sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
/* 052 */       sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
/* 053 */       sort_spillSize.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 054 */       sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 055 */       sort_needToSort = false;
/* 056 */     }
/* 057 */
/* 058 */     while (sort_sortedIter.hasNext()) {
/* 059 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 060 */
/* 061 */       append(sort_outputRow);
/* 062 */
/* 063 */       if (shouldStop()) return;
/* 064 */     }
/* 065 */   }
/* 066 */ }
Clone this wiki locally