From 0626de74f726622ac3eb251fc9f66aaa3de002d3 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 22 Aug 2018 19:10:24 +0100 Subject: [PATCH 1/4] initial commit --- .../aggregate/RowBasedHashMapGenerator.scala | 40 +++++++------------ .../VectorizedHashMapGenerator.scala | 39 ++++++------------ 2 files changed, 27 insertions(+), 52 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala index d5508275c48c5..5899bb4a1a6e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala @@ -44,31 +44,19 @@ class RowBasedHashMapGenerator( groupingKeySchema, bufferSchema) { override protected def initializeAggregateHashMap(): String = { - val generatedKeySchema: String = - s"new org.apache.spark.sql.types.StructType()" + - groupingKeySchema.map { key => - val keyName = ctx.addReferenceObj("keyName", key.name) - key.dataType match { - case d: DecimalType => - s""".add($keyName, org.apache.spark.sql.types.DataTypes.createDecimalType( - |${d.precision}, ${d.scale}))""".stripMargin - case _ => - s""".add($keyName, org.apache.spark.sql.types.DataTypes.${key.dataType})""" - } - }.mkString("\n").concat(";") + val generatedKeyColTypes = groupingKeySchema + .zipWithIndex.map { case (t, i) => (s"_col$i", t.dataType) } + val generatedKeySchemaTypes = generatedKeyColTypes + .foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) + val generatedKeySchema = + ctx.addReferenceObj("generatedKeySchemaTerm", generatedKeySchemaTypes) - val generatedValueSchema: String = - s"new org.apache.spark.sql.types.StructType()" + - bufferSchema.map { key => - val keyName = ctx.addReferenceObj("keyName", key.name) - key.dataType match { - case d: DecimalType => - s""".add($keyName, org.apache.spark.sql.types.DataTypes.createDecimalType( - |${d.precision}, ${d.scale}))""".stripMargin - case _ => - s""".add($keyName, org.apache.spark.sql.types.DataTypes.${key.dataType})""" - } - }.mkString("\n").concat(";") + val generatedValueColTypes = bufferSchema + .zipWithIndex.map { case (t, i) => (s"_col$i", t.dataType) } + val generatedValueTypes = generatedValueColTypes + .foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) + val generatedValueSchema = + ctx.addReferenceObj("generatedValueSchemaTerm", generatedValueTypes) s""" | private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch; @@ -78,8 +66,8 @@ class RowBasedHashMapGenerator( | private int numBuckets = (int) (capacity / loadFactor); | private int maxSteps = 2; | private int numRows = 0; - | private org.apache.spark.sql.types.StructType keySchema = $generatedKeySchema - | private org.apache.spark.sql.types.StructType valueSchema = $generatedValueSchema + | private org.apache.spark.sql.types.StructType keySchema = $generatedKeySchema; + | private org.apache.spark.sql.types.StructType valueSchema = $generatedValueSchema; | private Object emptyVBase; | private long emptyVOff; | private int emptyVLen; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala index 7b3580cecc60d..513380a0cadb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala @@ -52,31 +52,18 @@ class VectorizedHashMapGenerator( groupingKeySchema, bufferSchema) { override protected def initializeAggregateHashMap(): String = { - val generatedSchema: String = - s"new org.apache.spark.sql.types.StructType()" + - (groupingKeySchema ++ bufferSchema).map { key => - val keyName = ctx.addReferenceObj("keyName", key.name) - key.dataType match { - case d: DecimalType => - s""".add($keyName, org.apache.spark.sql.types.DataTypes.createDecimalType( - |${d.precision}, ${d.scale}))""".stripMargin - case _ => - s""".add($keyName, org.apache.spark.sql.types.DataTypes.${key.dataType})""" - } - }.mkString("\n").concat(";") + val generatedColTypes = (groupingKeySchema ++ bufferSchema) + .zipWithIndex.map { case (t, i) => (s"_col$i", t.dataType) } + val generatedSchemaTypes = generatedColTypes + .foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) + val generatedSchema = ctx.addReferenceObj("generatedSchemaTerm", generatedSchemaTypes) - val generatedAggBufferSchema: String = - s"new org.apache.spark.sql.types.StructType()" + - bufferSchema.map { key => - val keyName = ctx.addReferenceObj("keyName", key.name) - key.dataType match { - case d: DecimalType => - s""".add($keyName, org.apache.spark.sql.types.DataTypes.createDecimalType( - |${d.precision}, ${d.scale}))""".stripMargin - case _ => - s""".add($keyName, org.apache.spark.sql.types.DataTypes.${key.dataType})""" - } - }.mkString("\n").concat(";") + val generatedAggBufferColTypes = bufferSchema + .zipWithIndex.map { case (t, i) => (s"_col$i", t.dataType) } + val generatedAggBufferTypes = generatedAggBufferColTypes + .foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) + val generatedAggBufferSchema = + ctx.addReferenceObj("generatedAggBufferSchemaTerm", generatedAggBufferTypes) s""" | private ${classOf[OnHeapColumnVector].getName}[] vectors; @@ -88,9 +75,9 @@ class VectorizedHashMapGenerator( | private int numBuckets = (int) (capacity / loadFactor); | private int maxSteps = 2; | private int numRows = 0; - | private org.apache.spark.sql.types.StructType schema = $generatedSchema + | private org.apache.spark.sql.types.StructType schema = $generatedSchema; | private org.apache.spark.sql.types.StructType aggregateBufferSchema = - | $generatedAggBufferSchema + | $generatedAggBufferSchema; | | public $generatedClassName() { | vectors = ${classOf[OnHeapColumnVector].getName}.allocateColumns(capacity, schema); From 86c9c5b31f8a7f8ec722ceeedc8e4812ac7159ef Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 22 Aug 2018 20:42:53 +0100 Subject: [PATCH 2/4] address review comment remove unnecessary struct type in the generated code --- .../aggregate/RowBasedHashMapGenerator.scala | 15 ++----------- .../VectorizedHashMapGenerator.scala | 22 +++++-------------- 2 files changed, 8 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala index 5899bb4a1a6e5..717be79c8c3b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala @@ -44,19 +44,8 @@ class RowBasedHashMapGenerator( groupingKeySchema, bufferSchema) { override protected def initializeAggregateHashMap(): String = { - val generatedKeyColTypes = groupingKeySchema - .zipWithIndex.map { case (t, i) => (s"_col$i", t.dataType) } - val generatedKeySchemaTypes = generatedKeyColTypes - .foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) - val generatedKeySchema = - ctx.addReferenceObj("generatedKeySchemaTerm", generatedKeySchemaTypes) - - val generatedValueColTypes = bufferSchema - .zipWithIndex.map { case (t, i) => (s"_col$i", t.dataType) } - val generatedValueTypes = generatedValueColTypes - .foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) - val generatedValueSchema = - ctx.addReferenceObj("generatedValueSchemaTerm", generatedValueTypes) + val generatedKeySchema = ctx.addReferenceObj("generatedKeySchemaTerm", groupingKeySchema) + val generatedValueSchema = ctx.addReferenceObj("generatedValueSchemaTerm", bufferSchema) s""" | private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala index 513380a0cadb3..d7f89b362429f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala @@ -52,18 +52,10 @@ class VectorizedHashMapGenerator( groupingKeySchema, bufferSchema) { override protected def initializeAggregateHashMap(): String = { - val generatedColTypes = (groupingKeySchema ++ bufferSchema) - .zipWithIndex.map { case (t, i) => (s"_col$i", t.dataType) } - val generatedSchemaTypes = generatedColTypes - .foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) - val generatedSchema = ctx.addReferenceObj("generatedSchemaTerm", generatedSchemaTypes) - - val generatedAggBufferColTypes = bufferSchema - .zipWithIndex.map { case (t, i) => (s"_col$i", t.dataType) } - val generatedAggBufferTypes = generatedAggBufferColTypes - .foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) - val generatedAggBufferSchema = - ctx.addReferenceObj("generatedAggBufferSchemaTerm", generatedAggBufferTypes) + val generatedSchemaStructType = new StructType((groupingKeySchema ++ bufferSchema).toArray) + val generatedSchema = + ctx.addReferenceObj("generatedSchemaTerm", generatedSchemaStructType) + val generatedAggBufferSchemaFieldsLength = bufferSchema.fields.length s""" | private ${classOf[OnHeapColumnVector].getName}[] vectors; @@ -76,8 +68,6 @@ class VectorizedHashMapGenerator( | private int maxSteps = 2; | private int numRows = 0; | private org.apache.spark.sql.types.StructType schema = $generatedSchema; - | private org.apache.spark.sql.types.StructType aggregateBufferSchema = - | $generatedAggBufferSchema; | | public $generatedClassName() { | vectors = ${classOf[OnHeapColumnVector].getName}.allocateColumns(capacity, schema); @@ -85,8 +75,8 @@ class VectorizedHashMapGenerator( | | // Generates a projection to return the aggregate buffer only. | ${classOf[OnHeapColumnVector].getName}[] aggBufferVectors = - | new ${classOf[OnHeapColumnVector].getName}[aggregateBufferSchema.fields().length]; - | for (int i = 0; i < aggregateBufferSchema.fields().length; i++) { + | new ${classOf[OnHeapColumnVector].getName}[$generatedAggBufferSchemaFieldsLength]; + | for (int i = 0; i < $generatedAggBufferSchemaFieldsLength; i++) { | aggBufferVectors[i] = vectors[i + ${groupingKeys.length}]; | } | aggBufferRow = new ${classOf[MutableColumnarRow].getName}(aggBufferVectors); From 81ef75a36a1c9dcd6922d2ec77393bc35389efd0 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 23 Aug 2018 06:22:41 +0100 Subject: [PATCH 3/4] address review comment --- .../execution/aggregate/RowBasedHashMapGenerator.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala index 717be79c8c3b1..ca59bb145f299 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala @@ -44,8 +44,8 @@ class RowBasedHashMapGenerator( groupingKeySchema, bufferSchema) { override protected def initializeAggregateHashMap(): String = { - val generatedKeySchema = ctx.addReferenceObj("generatedKeySchemaTerm", groupingKeySchema) - val generatedValueSchema = ctx.addReferenceObj("generatedValueSchemaTerm", bufferSchema) + val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) + val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) s""" | private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch; @@ -55,8 +55,6 @@ class RowBasedHashMapGenerator( | private int numBuckets = (int) (capacity / loadFactor); | private int maxSteps = 2; | private int numRows = 0; - | private org.apache.spark.sql.types.StructType keySchema = $generatedKeySchema; - | private org.apache.spark.sql.types.StructType valueSchema = $generatedValueSchema; | private Object emptyVBase; | private long emptyVOff; | private int emptyVLen; @@ -67,9 +65,9 @@ class RowBasedHashMapGenerator( | org.apache.spark.memory.TaskMemoryManager taskMemoryManager, | InternalRow emptyAggregationBuffer) { | batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch - | .allocate(keySchema, valueSchema, taskMemoryManager, capacity); + | .allocate($keySchema, $valueSchema, taskMemoryManager, capacity); | - | final UnsafeProjection valueProjection = UnsafeProjection.create(valueSchema); + | final UnsafeProjection valueProjection = UnsafeProjection.create($valueSchema); | final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes(); | | emptyVBase = emptyBuffer; From aaa3944f20db36898d231d94281ed08995d1da0b Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 23 Aug 2018 17:48:51 +0100 Subject: [PATCH 4/4] update variable names --- .../aggregate/VectorizedHashMapGenerator.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala index d7f89b362429f..95ebefed08f67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala @@ -52,10 +52,9 @@ class VectorizedHashMapGenerator( groupingKeySchema, bufferSchema) { override protected def initializeAggregateHashMap(): String = { - val generatedSchemaStructType = new StructType((groupingKeySchema ++ bufferSchema).toArray) - val generatedSchema = - ctx.addReferenceObj("generatedSchemaTerm", generatedSchemaStructType) - val generatedAggBufferSchemaFieldsLength = bufferSchema.fields.length + val schemaStructType = new StructType((groupingKeySchema ++ bufferSchema).toArray) + val schema = ctx.addReferenceObj("schemaTerm", schemaStructType) + val aggBufferSchemaFieldsLength = bufferSchema.fields.length s""" | private ${classOf[OnHeapColumnVector].getName}[] vectors; @@ -67,16 +66,15 @@ class VectorizedHashMapGenerator( | private int numBuckets = (int) (capacity / loadFactor); | private int maxSteps = 2; | private int numRows = 0; - | private org.apache.spark.sql.types.StructType schema = $generatedSchema; | | public $generatedClassName() { - | vectors = ${classOf[OnHeapColumnVector].getName}.allocateColumns(capacity, schema); + | vectors = ${classOf[OnHeapColumnVector].getName}.allocateColumns(capacity, $schema); | batch = new ${classOf[ColumnarBatch].getName}(vectors); | | // Generates a projection to return the aggregate buffer only. | ${classOf[OnHeapColumnVector].getName}[] aggBufferVectors = - | new ${classOf[OnHeapColumnVector].getName}[$generatedAggBufferSchemaFieldsLength]; - | for (int i = 0; i < $generatedAggBufferSchemaFieldsLength; i++) { + | new ${classOf[OnHeapColumnVector].getName}[$aggBufferSchemaFieldsLength]; + | for (int i = 0; i < $aggBufferSchemaFieldsLength; i++) { | aggBufferVectors[i] = vectors[i + ${groupingKeys.length}]; | } | aggBufferRow = new ${classOf[MutableColumnarRow].getName}(aggBufferVectors);