Skip to content

Commit

Permalink
Test compiling against the newest arrow; Fix validity map; Add benchm…
Browse files Browse the repository at this point in the history
…ark script

Remove arrow-tools dependency

changed zipWithIndex to while loop

modified benchmark to work with Python2 timeit

closes apache#13
  • Loading branch information
icexelloss authored and BryanCutler committed Feb 23, 2017
1 parent afd5739 commit a4b958e
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 37 deletions.
41 changes: 41 additions & 0 deletions benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pyspark
import timeit
import random
from pyspark.sql import SparkSession

numPartition = 8

def time(df, repeat, number):
print("toPandas with arrow")
print(timeit.repeat(lambda: df.toPandas(True), repeat=repeat, number=number))

print("toPandas without arrow")
print(timeit.repeat(lambda: df.toPandas(False), repeat=repeat, number=number))

def long():
return random.randint(0, 10000)

def double():
return random.random()

def genDataLocal(spark, size, columns):
data = [list([fn() for fn in columns]) for x in range(0, size)]
df = spark.createDataFrame(data)
return df

def genData(spark, size, columns):
rdd = spark.sparkContext\
.parallelize(range(0, size), numPartition)\
.map(lambda _: [fn() for fn in columns])
df = spark.createDataFrame(rdd)
return df

if __name__ == "__main__":
spark = SparkSession.builder.appName("ArrowBenchmark").getOrCreate()
df = genData(spark, 1000 * 1000, [long, double])
df.cache()
df.count()

time(df, 10, 1)

df.unpersist()
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
export PYTHONHASHSEED=0
exec "$PYSPARK_DRIVER_PYTHON" -m "$1"
exec "$PYSPARK_DRIVER_PYTHON" -m "$@"
exit
fi

Expand Down
20 changes: 0 additions & 20 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1891,26 +1891,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-tools</artifactId>
<version>${arrow.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
5 changes: 0 additions & 5 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,6 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-tools</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
40 changes: 36 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import scala.util.control.NonFatal

import io.netty.buffer.ArrowBuf
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.BitVector
import org.apache.arrow.vector.file.ArrowWriter
import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
import org.apache.arrow.vector.types.FloatingPointPrecision
Expand Down Expand Up @@ -63,7 +64,6 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.Utils


private[sql] object Dataset {
def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T] = {
new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
Expand Down Expand Up @@ -2424,6 +2424,29 @@ class Dataset[T] private[sql](
Math.ceil(numOfRows / 64.0).toInt * 8
}

private def fillArrow(buf: ArrowBuf, dataType: DataType): Unit = {
dataType match {
case NullType =>
case BooleanType =>
buf.writeBoolean(false)
case ShortType =>
buf.writeShort(0)
case IntegerType =>
buf.writeInt(0)
case LongType =>
buf.writeLong(0L)
case FloatType =>
buf.writeFloat(0f)
case DoubleType =>
buf.writeDouble(0d)
case ByteType =>
buf.writeByte(0)
case _ =>
throw new UnsupportedOperationException(
s"Unsupported data type ${dataType.simpleString}")
}
}

/**
* Get an entry from the InternalRow, and then set to ArrowBuf.
* Note: No Null check for the entry.
Expand Down Expand Up @@ -2464,20 +2487,29 @@ class Dataset[T] private[sql](

field.dataType match {
case IntegerType | LongType | DoubleType | FloatType | BooleanType | ByteType =>
val validity = allocator.buffer(numBytesOfBitmap(numOfRows))
val validityVector = new BitVector("validity", allocator)
val validityMutator = validityVector.getMutator
validityVector.allocateNew(numOfRows)
validityMutator.setValueCount(numOfRows)
val buf = allocator.buffer(numOfRows * field.dataType.defaultSize)
var nullCount = 0
rows.foreach { row =>
var index = 0
while (index < rows.length) {
val row = rows(index)
if (row.isNullAt(ordinal)) {
nullCount += 1
validityMutator.set(index, 0)
fillArrow(buf, field.dataType)
} else {
validityMutator.set(index, 1)
getAndSetToArrow(row, buf, field.dataType, ordinal)
}
index += 1
}

val fieldNode = new ArrowFieldNode(numOfRows, nullCount)

(Array(validity, buf), Array(fieldNode))
(Array(validityVector.getBuffer, buf), Array(fieldNode))

case StringType =>
val validityOffset = allocator.buffer(numBytesOfBitmap(numOfRows))
Expand Down
11 changes: 4 additions & 7 deletions sql/core/src/test/scala/org/apache/spark/sql/ArrowSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark.sql
import java.io.File

import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.tools.Integration
import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot}
import org.apache.arrow.vector.{BitVector, VectorLoader, VectorSchemaRoot}
import org.apache.arrow.vector.file.json.JsonFileReader
import org.apache.arrow.vector.util.Validator

import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}

class ArrowSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
import testImplicits._
private val nullIntsFile = "test-data/arrowNullInts.json"

private def testFile(fileName: String): String = {
Expand All @@ -43,16 +42,14 @@ class ArrowSuite extends QueryTest with SharedSQLContext with SQLTestUtils {

val arrowSchema = df.schemaToArrowSchema(df.schema)
val jsonSchema = jsonReader.start()
// TODO - requires changing to public API in arrow, will be addressed in ARROW-411
//Integration.compareSchemas(arrowSchema, jsonSchema)
Validator.compareSchemas(arrowSchema, jsonSchema)

val arrowRecordBatch = df.collectAsArrow(allocator)
val arrowRoot = new VectorSchemaRoot(arrowSchema, allocator)
val vectorLoader = new VectorLoader(arrowRoot)
vectorLoader.load(arrowRecordBatch)
val jsonRoot = jsonReader.read()

// TODO - requires changing to public API in arrow, will be addressed in ARROW-411
//Integration.compare(arrowRoot, jsonRoot)
Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot)
}
}

0 comments on commit a4b958e

Please sign in to comment.