Skip to content

Commit

Permalink
[SPARK-10162] [SQL] Fix the timezone omitting for PySpark Dataframe f…
Browse files Browse the repository at this point in the history
…ilter function

This PR addresses [SPARK-10162](https://issues.apache.org/jira/browse/SPARK-10162)
The issue is with DataFrame filter() function, if datetime.datetime is passed to it:
* Timezone information of this datetime is ignored
* This datetime is assumed to be in local timezone, which depends on the OS timezone setting

Fix includes both code change and regression test. Problem reproduction code on master:
```python
import pytz
from datetime import datetime
from pyspark.sql import *
from pyspark.sql.types import *
sqc = SQLContext(sc)
df = sqc.createDataFrame([], StructType([StructField("dt", TimestampType())]))

m1 = pytz.timezone('UTC')
m2 = pytz.timezone('Etc/GMT+3')

df.filter(df.dt > datetime(2000, 01, 01, tzinfo=m1)).explain()
df.filter(df.dt > datetime(2000, 01, 01, tzinfo=m2)).explain()
```
It gives the same timestamp ignoring time zone:
```
>>> df.filter(df.dt > datetime(2000, 01, 01, tzinfo=m1)).explain()
Filter (dt#0 > 946713600000000)
 Scan PhysicalRDD[dt#0]

>>> df.filter(df.dt > datetime(2000, 01, 01, tzinfo=m2)).explain()
Filter (dt#0 > 946713600000000)
 Scan PhysicalRDD[dt#0]
```
After the fix:
```
>>> df.filter(df.dt > datetime(2000, 01, 01, tzinfo=m1)).explain()
Filter (dt#0 > 946684800000000)
 Scan PhysicalRDD[dt#0]

>>> df.filter(df.dt > datetime(2000, 01, 01, tzinfo=m2)).explain()
Filter (dt#0 > 946695600000000)
 Scan PhysicalRDD[dt#0]
```
PR [8536](#8536) was occasionally closed by me dropping the repo

Author: 0x0FFF <programmerag@gmail.com>

Closes #8555 from 0x0FFF/SPARK-10162.
  • Loading branch information
0x0FFF authored and davies committed Sep 1, 2015
1 parent ec01280 commit bf550a4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
26 changes: 18 additions & 8 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,17 @@
from pyspark.sql.utils import AnalysisException, IllegalArgumentException


class UTC(datetime.tzinfo):
"""UTC"""
ZERO = datetime.timedelta(0)
class UTCOffsetTimezone(datetime.tzinfo):
"""
Specifies timezone in UTC offset
"""

def __init__(self, offset=0):
self.ZERO = datetime.timedelta(hours=offset)

def utcoffset(self, dt):
return self.ZERO

def tzname(self, dt):
return "UTC"

def dst(self, dt):
return self.ZERO

Expand Down Expand Up @@ -841,13 +842,22 @@ def test_filter_with_datetime(self):
self.assertEqual(0, df.filter(df.date > date).count())
self.assertEqual(0, df.filter(df.time > time).count())

def test_filter_with_datetime_timezone(self):
dt1 = datetime.datetime(2015, 4, 17, 23, 1, 2, 3000, tzinfo=UTCOffsetTimezone(0))
dt2 = datetime.datetime(2015, 4, 17, 23, 1, 2, 3000, tzinfo=UTCOffsetTimezone(1))
row = Row(date=dt1)
df = self.sqlCtx.createDataFrame([row])
self.assertEqual(0, df.filter(df.date == dt2).count())
self.assertEqual(1, df.filter(df.date > dt2).count())
self.assertEqual(0, df.filter(df.date < dt2).count())

def test_time_with_timezone(self):
day = datetime.date.today()
now = datetime.datetime.now()
ts = time.mktime(now.timetuple())
# class in __main__ is not serializable
from pyspark.sql.tests import UTC
utc = UTC()
from pyspark.sql.tests import UTCOffsetTimezone
utc = UTCOffsetTimezone()
utcnow = datetime.datetime.utcfromtimestamp(ts) # without microseconds
# add microseconds to utcnow (keeping year,month,day,hour,minute,second)
utcnow = datetime.datetime(*(utcnow.timetuple()[:6] + (now.microsecond, utc)))
Expand Down
7 changes: 5 additions & 2 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,8 +1290,11 @@ def can_convert(self, obj):

def convert(self, obj, gateway_client):
Timestamp = JavaClass("java.sql.Timestamp", gateway_client)
return Timestamp(int(time.mktime(obj.timetuple())) * 1000 + obj.microsecond // 1000)

seconds = (calendar.timegm(obj.utctimetuple()) if obj.tzinfo
else time.mktime(obj.timetuple()))
t = Timestamp(int(seconds) * 1000)
t.setNanos(obj.microsecond * 1000)
return t

# datetime is a subclass of date, we should register DatetimeConverter first
register_input_converter(DatetimeConverter())
Expand Down

0 comments on commit bf550a4

Please sign in to comment.