From 610cb3f3cb5713e9e733ccc36fdc197eae7f4fe5 Mon Sep 17 00:00:00 2001 From: 0x0FFF Date: Tue, 1 Sep 2015 05:30:09 -0700 Subject: [PATCH 1/3] [SPARK-10162] [SQL] Fix the timezone omitting for PySpark Dataframe filter function --- python/pyspark/sql/tests.py | 26 ++++++++++++++++++-------- python/pyspark/sql/types.py | 5 +++-- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index cd32e26c64f22..59a891bd7c420 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -50,16 +50,17 @@ from pyspark.sql.utils import AnalysisException, IllegalArgumentException -class UTC(datetime.tzinfo): - """UTC""" - ZERO = datetime.timedelta(0) +class UTCOffsetTimezone(datetime.tzinfo): + """ + Specifies timezone in UTC offset + """ + + def __init__(self, offset=0): + self.ZERO = datetime.timedelta(hours=offset) def utcoffset(self, dt): return self.ZERO - def tzname(self, dt): - return "UTC" - def dst(self, dt): return self.ZERO @@ -841,13 +842,22 @@ def test_filter_with_datetime(self): self.assertEqual(0, df.filter(df.date > date).count()) self.assertEqual(0, df.filter(df.time > time).count()) + def test_filter_with_datetime_timezone(self): + dt1 = datetime.datetime(2015, 4, 17, 23, 1, 2, 3000, tzinfo=UTCOffsetTimezone(0)) + dt2 = datetime.datetime(2015, 4, 17, 23, 1, 2, 3000, tzinfo=UTCOffsetTimezone(1)) + row = Row(date=dt1) + df = self.sqlCtx.createDataFrame([row]) + self.assertEqual(0, df.filter(df.date == dt2).count()) + self.assertEqual(1, df.filter(df.date > dt2).count()) + self.assertEqual(0, df.filter(df.date < dt2).count()) + def test_time_with_timezone(self): day = datetime.date.today() now = datetime.datetime.now() ts = time.mktime(now.timetuple()) # class in __main__ is not serializable - from pyspark.sql.tests import UTC - utc = UTC() + from pyspark.sql.tests import UTCOffsetTimezone + utc = UTCOffsetTimezone() utcnow = datetime.datetime.utcfromtimestamp(ts) # without microseconds # add microseconds to utcnow (keeping year,month,day,hour,minute,second) utcnow = datetime.datetime(*(utcnow.timetuple()[:6] + (now.microsecond, utc))) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 94e581a78364c..d9979ba948d89 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1290,8 +1290,9 @@ def can_convert(self, obj): def convert(self, obj, gateway_client): Timestamp = JavaClass("java.sql.Timestamp", gateway_client) - return Timestamp(int(time.mktime(obj.timetuple())) * 1000 + obj.microsecond // 1000) - + seconds = (calendar.timegm(obj.utctimetuple()) if obj.tzinfo + else time.mktime(obj.timetuple())) + return Timestamp(int(seconds) * 1000 + obj.microsecond // 1000) # datetime is a subclass of date, we should register DatetimeConverter first register_input_converter(DatetimeConverter()) From cd63eb0e0804942300d4243ed8d88a0fa4bdadc5 Mon Sep 17 00:00:00 2001 From: 0x0FFF Date: Tue, 1 Sep 2015 11:22:14 -0700 Subject: [PATCH 2/3] Fixing indentation --- python/pyspark/sql/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index d9979ba948d89..19beeaa4aa327 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1291,7 +1291,7 @@ def can_convert(self, obj): def convert(self, obj, gateway_client): Timestamp = JavaClass("java.sql.Timestamp", gateway_client) seconds = (calendar.timegm(obj.utctimetuple()) if obj.tzinfo - else time.mktime(obj.timetuple())) + else time.mktime(obj.timetuple())) return Timestamp(int(seconds) * 1000 + obj.microsecond // 1000) # datetime is a subclass of date, we should register DatetimeConverter first From 2acd285a21441357b12ba748ae0a46176fd5424f Mon Sep 17 00:00:00 2001 From: 0x0FFF Date: Tue, 1 Sep 2015 12:18:03 -0700 Subject: [PATCH 3/3] [SPARK-10162] [SQL] Using setNanos call to set microseconds --- python/pyspark/sql/types.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 19beeaa4aa327..f84d08d7098ad 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1292,7 +1292,9 @@ def convert(self, obj, gateway_client): Timestamp = JavaClass("java.sql.Timestamp", gateway_client) seconds = (calendar.timegm(obj.utctimetuple()) if obj.tzinfo else time.mktime(obj.timetuple())) - return Timestamp(int(seconds) * 1000 + obj.microsecond // 1000) + t = Timestamp(int(seconds) * 1000) + t.setNanos(obj.microsecond * 1000) + return t # datetime is a subclass of date, we should register DatetimeConverter first register_input_converter(DatetimeConverter())