Skip to content

Commit

Permalink
[DOP-13853] Update MongoDB package to 10.2.3
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Apr 26, 2024
1 parent 8b5202a commit c09500e
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 28 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/data/clickhouse/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ matrix:
clickhouse-version: 23.6.1-alpine
<<: *max
full:
# the lowest supported Clickhouse version by JDBC driver
# Clickhouse version with proper DateTime > DateTime64 comparison
- clickhouse-image: yandex/clickhouse-server
clickhouse-version: '20.7'
clickhouse-version: '21.1'
<<: *min
- clickhouse-image: clickhouse/clickhouse-server
clickhouse-version: 23.6.1-alpine
<<: *max
nightly:
- clickhouse-image: yandex/clickhouse-server
clickhouse-version: '20.7'
clickhouse-version: '21.1'
<<: *min
- clickhouse-image: clickhouse/clickhouse-server
clickhouse-version: latest-alpine
Expand Down
2 changes: 1 addition & 1 deletion docs/changelog/next_release/255.feature.rst
Original file line number Diff line number Diff line change
@@ -1 +1 @@
:class:`MongoDB` connection now uses MongoDB Spark connector ``10.2.2``, upgraded from ``10.1.1``, and supports passing custom versions: ``MongoDB.get_packages(scala_version=..., package_version=...)``.
:class:`MongoDB` connection now uses MongoDB Spark connector ``10.2.3``, upgraded from ``10.1.1``, and supports passing custom versions: ``MongoDB.get_packages(scala_version=..., package_version=...)``.
26 changes: 26 additions & 0 deletions docs/changelog/next_release/267.breaking.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Serialize DateTimeHWM to Clickhouse's ``DateTime64(6)`` (precision up to microseconds) instead of ``DateTime`` (precision up to seconds).

For Clickhouse below 21.1 comparing column of type ``DateTime`` with a value of type ``DateTime64`` was not supported, returning an empty dataframe.
To avoid this, replace:

.. code:: python
DBReader(
...,
hwm=DBReader.AutoDetectHWM(
name="my_hwm",
expression="hwm_column", # <--
),
)
with:

.. code:: python
DBReader(
...,
hwm=DBReader.AutoDetectHWM(
name="my_hwm",
expression="CAST(hwm_column AS DateTime64)", # <--
),
)
2 changes: 1 addition & 1 deletion docs/connection/db_connection/clickhouse/prerequisites.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Prerequisites
Version Compatibility
---------------------

* Clickhouse server versions: 20.7 or higher
* Clickhouse server versions: 21.1 or higher
* Spark versions: 2.3.x - 3.5.x
* Java versions: 8 - 20

Expand Down
4 changes: 2 additions & 2 deletions docs/connection/db_connection/mongodb/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ References

Here you can find source code with type conversions:

* `MongoDB -> Spark <https://github.com/mongodb/mongo-spark/blob/r10.1.1/src/main/java/com/mongodb/spark/sql/connector/schema/InferSchema.java#L121-L170>`_
* `Spark -> MongoDB <https://github.com/mongodb/mongo-spark/blob/r10.1.1/src/main/java/com/mongodb/spark/sql/connector/schema/RowToBsonDocumentConverter.java#L117-L200>`_
* `MongoDB -> Spark <https://github.com/mongodb/mongo-spark/blob/r10.2.3/src/main/java/com/mongodb/spark/sql/connector/schema/InferSchema.java#L130-L176>`_
* `Spark -> MongoDB <https://github.com/mongodb/mongo-spark/blob/r10.2.3/src/main/java/com/mongodb/spark/sql/connector/schema/RowToBsonDocumentConverter.java#L156-L252>`_

Supported types
---------------
Expand Down
8 changes: 5 additions & 3 deletions onetl/connection/db_connection/clickhouse/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ def get_min_value(self, value: Any) -> str:
return f"minOrNull({result})"

def _serialize_datetime(self, value: datetime) -> str:
result = value.strftime("%Y-%m-%d %H:%M:%S")
return f"CAST('{result}' AS DateTime)"
# this requires at least Clickhouse 21.1, see:
# https://github.com/ClickHouse/ClickHouse/issues/16655
result = value.strftime("%Y-%m-%d %H:%M:%S.%f")
return f"toDateTime64('{result}', 6)"

def _serialize_date(self, value: date) -> str:
result = value.strftime("%Y-%m-%d")
return f"CAST('{result}' AS Date)"
return f"toDate('{result}')"
16 changes: 8 additions & 8 deletions onetl/connection/db_connection/mongodb/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ class Config:
class MongoDB(DBConnection):
"""MongoDB connection. |support_hooks|
Based on package ``org.mongodb.spark:mongo-spark-connector:10.1.1``
(`MongoDB connector for Spark <https://www.mongodb.com/docs/spark-connector/v10.1/>`_)
Based on package `org.mongodb.spark:mongo-spark-connector:10.2.3 <https://mvnrepository.com/artifact/org.mongodb.spark/mongo-spark-connector_2.12/10.2.3>`_
(`MongoDB connector for Spark <https://www.mongodb.com/docs/spark-connector/v10.2/>`_)
.. warning::
Expand Down Expand Up @@ -149,7 +149,7 @@ def get_packages(
Spark version in format ``major.minor``. Used only if ``scala_version=None``.
package_version : str, optional
Specifies the version of the MongoDB Spark connector to use. Defaults to ``10.2.2``.
Specifies the version of the MongoDB Spark connector to use. Defaults to ``10.2.3``.
Examples
--------
Expand All @@ -160,10 +160,10 @@ def get_packages(
MongoDB.get_packages(scala_version="2.12")
# specify custom connector version
MongoDB.get_packages(scala_version="2.12", package_version="10.2.2")
MongoDB.get_packages(scala_version="2.12", package_version="10.2.3")
"""

default_package_version = "10.2.2"
default_package_version = "10.2.3"

if scala_version:
scala_ver = Version(scala_version).min_digits(2)
Expand All @@ -190,7 +190,7 @@ def package_spark_3_2(cls) -> str:
"use `MongoDB.get_packages(spark_version='3.2')` instead"
)
warnings.warn(msg, UserWarning, stacklevel=3)
return "org.mongodb.spark:mongo-spark-connector_2.12:10.2.2"
return "org.mongodb.spark:mongo-spark-connector_2.12:10.2.3"

@classproperty
def package_spark_3_3(cls) -> str:
Expand All @@ -200,7 +200,7 @@ def package_spark_3_3(cls) -> str:
"use `MongoDB.get_packages(spark_version='3.3')` instead"
)
warnings.warn(msg, UserWarning, stacklevel=3)
return "org.mongodb.spark:mongo-spark-connector_2.12:10.2.2"
return "org.mongodb.spark:mongo-spark-connector_2.12:10.2.3"

@classproperty
def package_spark_3_4(cls) -> str:
Expand All @@ -210,7 +210,7 @@ def package_spark_3_4(cls) -> str:
"use `MongoDB.get_packages(spark_version='3.4')` instead"
)
warnings.warn(msg, UserWarning, stacklevel=3)
return "org.mongodb.spark:mongo-spark-connector_2.12:10.2.2"
return "org.mongodb.spark:mongo-spark-connector_2.12:10.2.3"

@slot
def pipeline(
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/processing/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ClickhouseProcessing(BaseProcessing):
"text_string": "String",
"hwm_int": "Int32",
"hwm_date": "Date",
"hwm_datetime": "DateTime",
"hwm_datetime": "DateTime64(6)",
"float_value": "Float32",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,37 @@ def test_clickhouse_strategy_incremental_explicit_hwm_type(
ColumnDateHWM,
lambda x: x.isoformat(),
),
pytest.param(
"hwm_date",
"CAST(text_string AS Date32)",
ColumnDateHWM,
lambda x: x.isoformat(),
marks=pytest.mark.xfail(reason="Date32 type was added in ClickHouse 21.9"),
),
(
"hwm_datetime",
"CAST(text_string AS DateTime)",
ColumnDateTimeHWM,
lambda x: x.isoformat(),
),
(
"hwm_datetime",
"CAST(text_string AS DateTime64)",
ColumnDateTimeHWM,
lambda x: x.isoformat(),
),
(
"hwm_datetime",
"CAST(text_string AS DateTime64(3))",
ColumnDateTimeHWM,
lambda x: x.isoformat(),
),
(
"hwm_datetime",
"CAST(text_string AS DateTime64(6))",
ColumnDateTimeHWM,
lambda x: x.isoformat(),
),
],
)
def test_clickhouse_strategy_incremental_with_hwm_expr(
Expand Down
18 changes: 9 additions & 9 deletions tests/tests_unit/tests_db_connection_unit/test_mongodb_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
def test_mongodb_package():
warning_msg = re.escape("will be removed in 1.0.0, use `MongoDB.get_packages(spark_version=")
with pytest.warns(UserWarning, match=warning_msg):
assert MongoDB.package_spark_3_2 == "org.mongodb.spark:mongo-spark-connector_2.12:10.2.2"
assert MongoDB.package_spark_3_3 == "org.mongodb.spark:mongo-spark-connector_2.12:10.2.2"
assert MongoDB.package_spark_3_4 == "org.mongodb.spark:mongo-spark-connector_2.12:10.2.2"
assert MongoDB.package_spark_3_2 == "org.mongodb.spark:mongo-spark-connector_2.12:10.2.3"
assert MongoDB.package_spark_3_3 == "org.mongodb.spark:mongo-spark-connector_2.12:10.2.3"
assert MongoDB.package_spark_3_4 == "org.mongodb.spark:mongo-spark-connector_2.12:10.2.3"


def test_mongodb_get_packages_no_input():
Expand Down Expand Up @@ -50,16 +50,16 @@ def test_mongodb_get_packages_scala_version_not_supported(scala_version):
@pytest.mark.parametrize(
"spark_version, scala_version, package_version, package",
[
(None, "2.12", "10.2.2", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.2"),
(None, "2.13", "10.2.2", "org.mongodb.spark:mongo-spark-connector_2.13:10.2.2"),
("3.2", None, "10.2.2", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.2"),
("3.3", None, "10.2.2", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.2"),
("3.4", None, "10.2.2", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.2"),
(None, "2.12", "10.2.3", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.3"),
(None, "2.13", "10.2.3", "org.mongodb.spark:mongo-spark-connector_2.13:10.2.3"),
("3.2", None, "10.2.3", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.3"),
("3.3", None, "10.2.3", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.3"),
("3.4", None, "10.2.3", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.3"),
("3.2", "2.12", "10.1.1", "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1"),
("3.4", "2.13", "10.1.1", "org.mongodb.spark:mongo-spark-connector_2.13:10.1.1"),
("3.2", "2.12", "10.2.1", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.1"),
("3.2", "2.12", "10.2.0", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.0"),
("3.2.4", "2.12.1", "10.2.2", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.2"),
("3.2.4", "2.12.1", "10.2.3", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.3"),
],
)
def test_mongodb_get_packages(spark_version, scala_version, package_version, package):
Expand Down

0 comments on commit c09500e

Please sign in to comment.