Skip to content

Commit

Permalink
[SPARK-6055] [PySpark] fix incorrect __eq__ of DataType
Browse files Browse the repository at this point in the history
The _eq_ of DataType is not correct, class cache is not use correctly (created class can not be find by dataType), then it will create lots of classes (saved in _cached_cls), never released.

Also, all same DataType have same hash code, there will be many object in a dict with the same hash code, end with hash attach, it's very slow to access this dict (depends on the implementation of CPython).

This PR also improve the performance of inferSchema (avoid the unnecessary converter of object).

cc pwendell  JoshRosen

Author: Davies Liu <davies@databricks.com>

Closes #4808 from davies/leak and squashes the following commits:

6a322a4 [Davies Liu] tests refactor
3da44fc [Davies Liu] fix __eq__ of Singleton
534ac90 [Davies Liu] add more checks
46999dc [Davies Liu] fix tests
d9ae973 [Davies Liu] fix memory leak in sql
  • Loading branch information
Davies Liu authored and JoshRosen committed Feb 28, 2015
1 parent 8c468a6 commit e0e64ba
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 137 deletions.
90 changes: 1 addition & 89 deletions python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

import warnings
import json
from array import array
from itertools import imap

from py4j.protocol import Py4JError
from py4j.java_collections import MapConverter

from pyspark.rdd import RDD, _prepare_for_python_RDD
from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
from pyspark.sql.types import StringType, StructType, _verify_type, \
from pyspark.sql.types import Row, StringType, StructType, _verify_type, \
_infer_schema, _has_nulltype, _merge_type, _create_converter, _python_to_sql_converter
from pyspark.sql.dataframe import DataFrame

Expand Down Expand Up @@ -620,93 +619,6 @@ def _get_hive_ctx(self):
return self._jvm.HiveContext(self._jsc.sc())


def _create_row(fields, values):
row = Row(*values)
row.__FIELDS__ = fields
return row


class Row(tuple):

"""
A row in L{DataFrame}. The fields in it can be accessed like attributes.
Row can be used to create a row object by using named arguments,
the fields will be sorted by names.
>>> row = Row(name="Alice", age=11)
>>> row
Row(age=11, name='Alice')
>>> row.name, row.age
('Alice', 11)
Row also can be used to create another Row like class, then it
could be used to create Row objects, such as
>>> Person = Row("name", "age")
>>> Person
<Row(name, age)>
>>> Person("Alice", 11)
Row(name='Alice', age=11)
"""

def __new__(self, *args, **kwargs):
if args and kwargs:
raise ValueError("Can not use both args "
"and kwargs to create Row")
if args:
# create row class or objects
return tuple.__new__(self, args)

elif kwargs:
# create row objects
names = sorted(kwargs.keys())
values = tuple(kwargs[n] for n in names)
row = tuple.__new__(self, values)
row.__FIELDS__ = names
return row

else:
raise ValueError("No args or kwargs")

def asDict(self):
"""
Return as an dict
"""
if not hasattr(self, "__FIELDS__"):
raise TypeError("Cannot convert a Row class into dict")
return dict(zip(self.__FIELDS__, self))

# let obect acs like class
def __call__(self, *args):
"""create new Row object"""
return _create_row(self, args)

def __getattr__(self, item):
if item.startswith("__"):
raise AttributeError(item)
try:
# it will be slow when it has many fields,
# but this will not be used in normal cases
idx = self.__FIELDS__.index(item)
return self[idx]
except IndexError:
raise AttributeError(item)

def __reduce__(self):
if hasattr(self, "__FIELDS__"):
return (_create_row, (self.__FIELDS__, tuple(self)))
else:
return tuple.__reduce__(self)

def __repr__(self):
if hasattr(self, "__FIELDS__"):
return "Row(%s)" % ", ".join("%s=%r" % (k, v)
for k, v in zip(self.__FIELDS__, self))
else:
return "<Row(%s)>" % ", ".join(self)


def _test():
import doctest
from pyspark.context import SparkContext
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1025,10 +1025,12 @@ def cast(self, dataType):
ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc())
jdt = ssql_ctx.parseDataType(dataType.json())
jc = self._jc.cast(jdt)
else:
raise TypeError("unexpected type: %s" % type(dataType))
return Column(jc)

def __repr__(self):
return 'Column<%s>' % self._jdf.toString().encode('utf8')
return 'Column<%s>' % self._jc.toString().encode('utf8')


def _test():
Expand Down
9 changes: 9 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pydoc
import shutil
import tempfile
import pickle

import py4j

Expand Down Expand Up @@ -88,6 +89,14 @@ def __eq__(self, other):
other.x == self.x and other.y == self.y


class DataTypeTests(unittest.TestCase):
# regression test for SPARK-6055
def test_data_type_eq(self):
lt = LongType()
lt2 = pickle.loads(pickle.dumps(LongType()))
self.assertEquals(lt, lt2)


class SQLTests(ReusedPySparkTestCase):

@classmethod
Expand Down
Loading

0 comments on commit e0e64ba

Please sign in to comment.