Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22814][SQL] Support Date/Timestamp in a JDBC partition column #21834

Closed
wants to merge 4 commits into from

Conversation

maropu
Copy link
Member

@maropu maropu commented Jul 21, 2018

What changes were proposed in this pull request?

This pr supported Date/Timestamp in a JDBC partition column (a numeric column is only supported in the master). This pr also modified code to verify a partition column type;

val jdbcTable = spark.read
 .option("partitionColumn", "text")
 .option("lowerBound", "aaa")
 .option("upperBound", "zzz")
 .option("numPartitions", 2)
 .jdbc("jdbc:postgresql:postgres", "t", options)

// with this pr
org.apache.spark.sql.AnalysisException: Partition column type should be numeric, date, or timestamp, but string found.;
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.verifyAndGetNormalizedPartitionColumn(JDBCRelation.scala:165)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.columnPartition(JDBCRelation.scala:85)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:317)

// without this pr
java.lang.NumberFormatException: For input string: "aaa"
  at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Long.parseLong(Long.java:589)
  at java.lang.Long.parseLong(Long.java:631)
  at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)

Closes #19999

How was this patch tested?

Added tests in JDBCSuite.

@SparkQA
Copy link

SparkQA commented Jul 21, 2018

Test build #93381 has finished for PR 21834 at commit 87f9be3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jul 21, 2018

@gatorsmile

@maropu
Copy link
Member Author

maropu commented Jul 24, 2018

ping

case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) =>
val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet
assert(whereClauses === Set(
""""T" < '2018-07-15 20:50:32.5' or "T" is null""",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test case to OracleIntegrationSuite.scala? I am afraid this does not work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@gatorsmile
Copy link
Member

@maropu Do we have a log message for users to know the generated where clauses? If not, could you add one?

@maropu
Copy link
Member Author

maropu commented Jul 25, 2018

Currently, no. Is it ok that the log level is INFO?

@SparkQA
Copy link

SparkQA commented Jul 25, 2018

Test build #93532 has finished for PR 21834 at commit 1041a38.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jul 25, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jul 25, 2018

Test build #93537 has finished for PR 21834 at commit 1041a38.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 25, 2018

Test build #93542 has finished for PR 21834 at commit 577f66e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

gatorsmile commented Jul 30, 2018

My only concern is the time zone issues. However, it will not affect the correctness since the logical partitioning will cover the whole range anyway. The worse case is the partitioning might not start and end in the expected boundaries due to the local/remote time zone issues, if existed.

Since this is a new feature, we can address the issues and improve the solution in the following releases.

@gatorsmile
Copy link
Member

retest this please

@gatorsmile
Copy link
Member

LGTM pending tests

@SparkQA
Copy link

SparkQA commented Jul 30, 2018

Test build #93778 has finished for PR 21834 at commit 577f66e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master.

@asfgit asfgit closed this in 47d84e4 Jul 30, 2018
@maropu
Copy link
Member Author

maropu commented Jul 31, 2018

Thanks for the merge!

robert3005 pushed a commit to palantir/spark that referenced this pull request Jul 31, 2018
## What changes were proposed in this pull request?
This pr supported Date/Timestamp in a JDBC partition column (a numeric column is only supported in the master). This pr also modified code to verify a partition column type;
```
val jdbcTable = spark.read
 .option("partitionColumn", "text")
 .option("lowerBound", "aaa")
 .option("upperBound", "zzz")
 .option("numPartitions", 2)
 .jdbc("jdbc:postgresql:postgres", "t", options)

// with this pr
org.apache.spark.sql.AnalysisException: Partition column type should be numeric, date, or timestamp, but string found.;
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.verifyAndGetNormalizedPartitionColumn(JDBCRelation.scala:165)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.columnPartition(JDBCRelation.scala:85)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:317)

// without this pr
java.lang.NumberFormatException: For input string: "aaa"
  at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Long.parseLong(Long.java:589)
  at java.lang.Long.parseLong(Long.java:631)
  at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
```

Closes apache#19999

## How was this patch tested?
Added tests in `JDBCSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes apache#21834 from maropu/SPARK-22814.
private def toInternalBoundValue(value: String, columnType: DataType): Long = columnType match {
case _: NumericType => value.toLong
case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value)).toLong
case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu The Timestamp.valueOf method expects timestamp in the format yyyy-[m]m-[d]d hh:mm:ss[.f...]. Was it selected intentionally or just because it is the default pattern for Timestamp?

For example, it cannot parse time zones as Cast can:

Timestamp.valueOf("1973-02-27 02:30:00.102030+01:00")
Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
	at java.sql.Timestamp.valueOf(Timestamp.java:251)

Copy link
Member Author

@maropu maropu Jan 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thinks it'd be nice to accept more general pattens for date/timestamp. Can you make a pr to fix that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am going to prepare a PR for that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@AlJohri
Copy link

AlJohri commented May 4, 2019

@gatorsmile @maropu this currently does not work with pyspark due to this line:

return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound),
int(numPartitions), jprop))

it tries to convert lowerBound and upperBound to an int.

The resulting traceback is:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-40-2636f0dd1e0a> in <module>
     16                         upperBound=now,
     17                         numPartitions=sc.defaultParallelism,
---> 18                         properties={'driver': 'org.postgresql.Driver'})
     19                    .join(article_metadata, on=['url'], how='left')
     20                    .orderBy('timestamp', ascending=False))

/usr/lib/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, column, lowerBound, upperBound, numPartitions, predicates, properties)
    550             assert numPartitions is not None, \
    551                 "numPartitions can not be None when ``column`` is specified"
--> 552             return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound),
    553                                                int(numPartitions), jprop))
    554         if predicates is not None:

TypeError: int() argument must be a string, a bytes-like object or a number, not 'datetime.datetime'

I think just removing the int may fix the issue but I'm not 100% sure.

EDIT: looks like the pyspark docs explicitly say to use an integer column at the moment

@shatestest
Copy link

When i chose INSERTION_DATE as my partitionColumn with below dates
.option("lowerBound","2002-03-31");
.option("upperBound", "2019-05-01");
.option("dateFormat", "yyyy-mm-dd");

Getting error : ORA-01861: literal does not match format string
How to pass the dates for "lower/upperBound" ???

@MaxGekk
Copy link
Member

MaxGekk commented May 7, 2019

@shatestest mm means minute-of-hour, see https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html . I guess you need to replace mm by MM.

@shatestest
Copy link

@MaxGekk thanks for reply but same error with .option("dateFormat", "yyyy-MM-dd");

@shatestest
Copy link

@SparkQA , may I know how you tested partitionColumn for date/timestamp ? any sample test cases ?

@MaxGekk
Copy link
Member

MaxGekk commented May 7, 2019

There are tests in the PR, see https://github.com/apache/spark/pull/21834/files#diff-5e0cadf526662f9281aa26315b3750adR442 . @shatestest Could you open an JIRA ticket, and provide a reproducible example, please.

@AlJohri
Copy link

AlJohri commented May 7, 2019

If anyone finds themselves here looking to do this in pyspark, until support for this feature is added, here is a workaround:

import datetime
from itertools import tee

def date_range(start, end, intv):
    diff = (end - start) / intv
    for i in range(intv):
        yield start + diff * i
    yield end

def pairwise(iterable):
    a, b = tee(iterable)
    next(b, None)
    return zip(a, b)

partition_column = 'mypartitioncol'
now = datetime.datetime.now(datetime.timezone.utc)
num_partitions = sc.defaultParallelism
lower_bound = now + datetime.timedelta(-30)
upper_bound = now

predicates = []
for start, end in pairwise(date_range(lower_bound, upper_bound, num_partitions)):
    predicates.append(f'{partition_column} >= \'{start.isoformat()}\' AND {partition_column} < \'{end.isoformat()}\'')

df = (spark.read.jdbc(
            url='myjdbcuri',
            table='mytable',
            predicates=predicates,
            properties={'driver': 'org.postgresql.Driver'}))

Associated JIRA comment.

@shatestest
Copy link

@MaxGekk sorry for delay . raised a ticket ...please let me know if you need any more info .... https://issues.apache.org/jira/browse/SPARK-27723

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants