Skip to content

Commit

Permalink
[SPARK-22032][PYSPARK] Speed up StructType conversion
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

StructType.fromInternal is calling f.fromInternal(v) for every field.
We can use precalculated information about type to limit the number of function calls. (its calculated once per StructType and used in per record calculations)

Benchmarks (Python profiler)
```
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
```

Before
```
310274584 function calls (300272456 primitive calls) in 1320.684 seconds

Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 10000000  253.417    0.000  486.991    0.000 types.py:619(<listcomp>)
 30000000  192.272    0.000 1009.986    0.000 types.py:612(fromInternal)
100000000  176.140    0.000  176.140    0.000 types.py:88(fromInternal)
 20000000  156.832    0.000  328.093    0.000 types.py:1471(_create_row)
    14000  107.206    0.008 1237.917    0.088 {built-in method loads}
 20000000   80.176    0.000 1090.162    0.000 types.py:1468(<lambda>)
```

After
```
210274584 function calls (200272456 primitive calls) in 1035.974 seconds

Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 30000000  215.845    0.000  698.748    0.000 types.py:612(fromInternal)
 20000000  165.042    0.000  351.572    0.000 types.py:1471(_create_row)
    14000  116.834    0.008  946.791    0.068 {built-in method loads}
 20000000   87.326    0.000  786.073    0.000 types.py:1468(<lambda>)
 20000000   85.477    0.000  134.607    0.000 types.py:1519(__new__)
 10000000   65.777    0.000  126.712    0.000 types.py:619(<listcomp>)
```

Main difference is types.py:619(<listcomp>) and types.py:88(fromInternal) (which is removed in After)
The number of function calls is 100 million less. And performance is 20% better.

Benchmark (worst case scenario.)

Test
```
df = spark.range(1000000).selectExpr("current_timestamp as id0", "current_timestamp as id1", "current_timestamp as id2", "current_timestamp as id3", "current_timestamp as id4", "current_timestamp as id5", "current_timestamp as id6", "current_timestamp as id7", "current_timestamp as id8", "current_timestamp as id9").cache()
df.count()
df.rdd.map(lambda x: x).count()
```

Before
```
31166064 function calls (31163984 primitive calls) in 150.882 seconds
```

After
```
31166064 function calls (31163984 primitive calls) in 153.220 seconds
```

IMPORTANT:
The benchmark was done on top of #19246.
Without #19246 the performance improvement will be even greater.

## How was this patch tested?

Existing tests.
Performance benchmark.

Author: Maciej Bryński <maciek-github@brynski.pl>

Closes #19249 from maver1ck/spark_22032.
  • Loading branch information
maver1ck authored and HyukjinKwon committed Sep 17, 2017
1 parent 73d9067 commit f407302
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions python/pyspark/sql/types.py
Expand Up @@ -483,7 +483,9 @@ def __init__(self, fields=None):
self.names = [f.name for f in fields]
assert all(isinstance(f, StructField) for f in fields),\
"fields should be a list of StructField"
self._needSerializeAnyField = any(f.needConversion() for f in self)
# Precalculated list of fields that need conversion with fromInternal/toInternal functions
self._needConversion = [f.needConversion() for f in self]
self._needSerializeAnyField = any(self._needConversion)

def add(self, field, data_type=None, nullable=True, metadata=None):
"""
Expand Down Expand Up @@ -528,7 +530,9 @@ def add(self, field, data_type=None, nullable=True, metadata=None):
data_type_f = data_type
self.fields.append(StructField(field, data_type_f, nullable, metadata))
self.names.append(field)
self._needSerializeAnyField = any(f.needConversion() for f in self)
# Precalculated list of fields that need conversion with fromInternal/toInternal functions
self._needConversion = [f.needConversion() for f in self]
self._needSerializeAnyField = any(self._needConversion)
return self

def __iter__(self):
Expand Down Expand Up @@ -590,13 +594,17 @@ def toInternal(self, obj):
return

if self._needSerializeAnyField:
# Only calling toInternal function for fields that need conversion
if isinstance(obj, dict):
return tuple(f.toInternal(obj.get(n)) for n, f in zip(self.names, self.fields))
return tuple(f.toInternal(obj.get(n)) if c else obj.get(n)
for n, f, c in zip(self.names, self.fields, self._needConversion))
elif isinstance(obj, (tuple, list)):
return tuple(f.toInternal(v) for f, v in zip(self.fields, obj))
return tuple(f.toInternal(v) if c else v
for f, v, c in zip(self.fields, obj, self._needConversion))
elif hasattr(obj, "__dict__"):
d = obj.__dict__
return tuple(f.toInternal(d.get(n)) for n, f in zip(self.names, self.fields))
return tuple(f.toInternal(d.get(n)) if c else d.get(n)
for n, f, c in zip(self.names, self.fields, self._needConversion))
else:
raise ValueError("Unexpected tuple %r with StructType" % obj)
else:
Expand All @@ -619,7 +627,9 @@ def fromInternal(self, obj):
# it's already converted by pickler
return obj
if self._needSerializeAnyField:
values = [f.fromInternal(v) for f, v in zip(self.fields, obj)]
# Only calling fromInternal function for fields that need conversion
values = [f.fromInternal(v) if c else v
for f, v, c in zip(self.fields, obj, self._needConversion)]
else:
values = obj
return _create_row(self.names, values)
Expand Down

0 comments on commit f407302

Please sign in to comment.