Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-8664] - Allow modes "ignore" and "error" in MongoDB.WriteOptions #145

Merged
merged 9 commits into from
Sep 11, 2023
1 change: 1 addition & 0 deletions docs/changelog/next_release/145.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``if_exists="ignore"`` and ``error`` to ``MongoDB.WriteOptions``
20 changes: 20 additions & 0 deletions onetl/connection/db_connection/mongodb/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,16 @@ def write_df_to_target(
else "append"
)

if self._collection_exists(target):
if write_options.if_exists == MongoDBCollectionExistBehavior.ERROR:
raise ValueError("Operation stopped due to if_exists set to 'error'.")
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
elif write_options.if_exists == MongoDBCollectionExistBehavior.IGNORE:
log.info(
"|%s| No further action is taken due to if_exists is set to 'ignore'",
dolfinus marked this conversation as resolved.
Show resolved Hide resolved
self.__class__.__name__,
)
return

log.info("|%s| Saving data to a collection %r", self.__class__.__name__, target)
df.write.format("mongodb").mode(mode).options(**write_options_dict).save()
log.info("|%s| Collection %r is successfully written", self.__class__.__name__, target)
Expand Down Expand Up @@ -533,3 +543,13 @@ def _check_java_class_imported(cls, spark):
log.debug("Missing Java class", exc_info=e, stack_info=True)
raise ValueError(msg) from e
return spark

def _collection_exists(self, source: str) -> bool:
jvm = self.spark._jvm
client = jvm.com.mongodb.client.MongoClients.create(self.connection_url) # type: ignore
collections = set(client.getDatabase(self.database).listCollectionNames().iterator())
if source in collections:
log.info("|%s| Collection %r exists", self.__class__.__name__, source)
return True
log.info("|%s| Collection %r does not exist", self.__class__.__name__, source)
return False
51 changes: 36 additions & 15 deletions onetl/connection/db_connection/mongodb/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@

class MongoDBCollectionExistBehavior(str, Enum):
APPEND = "append"
IGNORE = "ignore"
ERROR = "error"
REPLACE_ENTIRE_COLLECTION = "replace_entire_collection"

def __str__(self) -> str:
Expand Down Expand Up @@ -207,33 +209,52 @@ class MongoDBWriteOptions(GenericOptions):

.. dropdown:: Behavior in details

* Collection does not exist
Collection is created using options provided by user
(``shardkey`` and others).
* Collection does not exist
Collection is created using options provided by user
(``shardkey`` and others).

* Collection exists
Data is appended to a collection.
* Collection exists
Data is appended to a collection.

.. warning::
.. warning::

This mode does not check whether collection already contains
objects from dataframe, so duplicated objects can be created.
This mode does not check whether collection already contains
objects from dataframe, so duplicated objects can be created.

* ``replace_entire_collection``
**Collection is deleted and then created**.

.. dropdown:: Behavior in details

* Collection does not exist
Collection is created using options provided by user
(``shardkey`` and others).
* Collection does not exist
Collection is created using options provided by user
(``shardkey`` and others).

* Collection exists
Collection content is replaced with dataframe content.
* Collection exists
Collection content is replaced with dataframe content.

.. note::
* ``ignore``
Ignores the write operation if the collection already exists.

.. dropdown:: Behavior in details

* Collection does not exist
Collection is created using options provided by user

* Collection exists
The write operation is ignored, and no data is written to the collection.

* ``error``
Raises an error if the collection already exists.

.. dropdown:: Behavior in details

* Collection does not exist
Collection is created using options provided by user

* Collection exists
An error is raised, and no data is written to the collection.

``error`` and ``ignore`` modes are not supported.
"""

class Config:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

import pytest

from onetl.connection import MongoDB
Expand All @@ -6,8 +8,18 @@
pytestmark = pytest.mark.mongodb


@pytest.mark.parametrize(
"options",
[
{},
{"if_exists": "append"},
{"if_exists": "replace_entire_collection"},
{"if_exists": "error"},
{"if_exists": "ignore"},
],
)
@pytest.mark.flaky(reruns=2)
def test_mongodb_writer_snapshot(spark, processing, prepare_schema_table):
def test_mongodb_writer_snapshot(spark, processing, get_schema_table, options, caplog):
df = processing.create_spark_df(spark=spark)

mongo = MongoDB(
Expand All @@ -21,12 +33,132 @@ def test_mongodb_writer_snapshot(spark, processing, prepare_schema_table):

writer = DBWriter(
connection=mongo,
table=prepare_schema_table.table,
table=get_schema_table.table,
options=MongoDB.WriteOptions(**options),
)
writer.run(df)

with caplog.at_level(logging.INFO):
writer.run(df)

assert f"|MongoDB| Collection '{get_schema_table.table}' does not exist" in caplog.text

processing.assert_equal_df(
schema=prepare_schema_table.schema,
table=prepare_schema_table.table,
schema=get_schema_table.schema,
table=get_schema_table.table,
df=df,
)


def test_mongodb_writer_if_exists_append(spark, processing, get_schema_table):
df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500)
df1 = df[df._id < 1001]
df2 = df[df._id > 1000]

mongo = MongoDB(
host=processing.host,
port=processing.port,
user=processing.user,
password=processing.password,
database=processing.database,
spark=spark,
)

writer = DBWriter(
connection=mongo,
table=get_schema_table.table,
options=MongoDB.WriteOptions(if_exists="append"),
)
writer.run(df1)
writer.run(df2)

processing.assert_equal_df(
schema=get_schema_table.schema,
table=get_schema_table.table,
df=df,
)


def test_mongodb_writer_if_exists_replace_entire_collection(spark, processing, get_schema_table):
df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500)
df1 = df[df._id < 1001]
df2 = df[df._id > 1000]

mongo = MongoDB(
host=processing.host,
port=processing.port,
user=processing.user,
password=processing.password,
database=processing.database,
spark=spark,
)

writer = DBWriter(
connection=mongo,
table=get_schema_table.table,
options=MongoDB.WriteOptions(if_exists="replace_entire_collection"),
)
writer.run(df1)
writer.run(df2)

processing.assert_equal_df(
schema=get_schema_table.schema,
table=get_schema_table.table,
df=df2,
)


def test_mongodb_writer_if_exists_error(spark, processing, get_schema_table, caplog):
df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500)

mongo = MongoDB(
host=processing.host,
port=processing.port,
user=processing.user,
password=processing.password,
database=processing.database,
spark=spark,
)

writer = DBWriter(
connection=mongo,
table=get_schema_table.table,
options=MongoDB.WriteOptions(if_exists="error"),
)
writer.run(df)

with pytest.raises(ValueError, match="Operation stopped due to if_exists set to 'error'."):
writer.run(df)
dolfinus marked this conversation as resolved.
Show resolved Hide resolved


def test_mongodb_writer_if_exists_ignore(spark, processing, get_schema_table, caplog):
df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500)
df1 = df[df._id < 1001]
df2 = df[df._id > 1000]

mongo = MongoDB(
host=processing.host,
port=processing.port,
user=processing.user,
password=processing.password,
database=processing.database,
spark=spark,
)

writer = DBWriter(
connection=mongo,
table=get_schema_table.table,
options=MongoDB.WriteOptions(if_exists="ignore"),
)
writer.run(df1)

with caplog.at_level(logging.INFO):
writer.run(df2) # The write operation is ignored

assert f"|MongoDB| Collection '{get_schema_table.table}' exists" in caplog.text
assert "|MongoDB| No further action is taken due to if_exists is set to 'ignore'" in caplog.text

processing.assert_equal_df(
schema=get_schema_table.schema,
table=get_schema_table.table,
df=df1,
)
18 changes: 14 additions & 4 deletions tests/tests_unit/tests_db_connection_unit/test_mongodb_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ def test_mongodb_convert_dict_to_str():
[
({}, MongoDBCollectionExistBehavior.APPEND),
({"if_exists": "append"}, MongoDBCollectionExistBehavior.APPEND),
({"if_exists": "ignore"}, MongoDBCollectionExistBehavior.IGNORE),
({"if_exists": "error"}, MongoDBCollectionExistBehavior.ERROR),
({"if_exists": "replace_entire_collection"}, MongoDBCollectionExistBehavior.REPLACE_ENTIRE_COLLECTION),
],
)
Expand Down Expand Up @@ -261,6 +263,18 @@ def test_mongodb_write_options_if_exists(options, value):
"Mode `overwrite` is deprecated since v0.9.0 and will be removed in v1.0.0. "
"Use `replace_entire_collection` instead",
),
(
{"mode": "ignore"},
MongoDBCollectionExistBehavior.IGNORE,
"Option `MongoDB.WriteOptions(mode=...)` is deprecated since v0.9.0 and will be removed in v1.0.0. "
"Use `MongoDB.WriteOptions(if_exists=...)` instead",
),
(
{"mode": "error"},
MongoDBCollectionExistBehavior.ERROR,
"Option `MongoDB.WriteOptions(mode=...)` is deprecated since v0.9.0 and will be removed in v1.0.0. "
"Use `MongoDB.WriteOptions(if_exists=...)` instead",
),
],
)
def test_mongodb_write_options_mode_deprecated(options, value, message):
Expand All @@ -272,10 +286,6 @@ def test_mongodb_write_options_mode_deprecated(options, value, message):
@pytest.mark.parametrize(
"options",
[
# disallowed modes
{"mode": "error"},
{"mode": "ignore"},
# wrong mode
{"mode": "wrong_mode"},
],
)
Expand Down
Loading