Skip to content

Commit

Permalink
Merge branch 'develop' into feature/DOP-8664
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim-lixakov committed Sep 11, 2023
2 parents acd4ac5 + 0adf055 commit e205d8e
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 34 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/144.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``if_exists="ignore"`` and ``error`` to ``JDBC.WriteOptions``
6 changes: 5 additions & 1 deletion onetl/connection/db_connection/jdbc_connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,11 @@ def write_df_to_target(
write_options = self.WriteOptions.parse(options)
jdbc_params = self.options_to_jdbc_params(write_options)

mode = "append" if write_options.if_exists == JDBCTableExistBehavior.APPEND else "overwrite"
mode = (
"overwrite"
if write_options.if_exists == JDBCTableExistBehavior.REPLACE_ENTIRE_TABLE
else write_options.if_exists.value
)
log.info("|%s| Saving data to a table %r", self.__class__.__name__, target)
df.write.jdbc(table=target, mode=mode, **jdbc_params)
log.info("|%s| Table %r successfully written", self.__class__.__name__, target)
Expand Down
69 changes: 46 additions & 23 deletions onetl/connection/db_connection/jdbc_connection/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@

class JDBCTableExistBehavior(str, Enum):
APPEND = "append"
IGNORE = "ignore"
ERROR = "error"
REPLACE_ENTIRE_TABLE = "replace_entire_table"

def __str__(self) -> str:
Expand Down Expand Up @@ -413,44 +415,65 @@ class Config:
.. dropdown:: Behavior in details
* Table does not exist
Table is created using options provided by user
(``createTableOptions``, ``createTableColumnTypes``, etc).
* Table does not exist
Table is created using options provided by user
(``createTableOptions``, ``createTableColumnTypes``, etc).
* Table exists
Data is appended to a table. Table has the same DDL as before writing data
* Table exists
Data is appended to a table. Table has the same DDL as before writing data
.. warning::
.. warning::
This mode does not check whether table already contains
rows from dataframe, so duplicated rows can be created.
This mode does not check whether table already contains
rows from dataframe, so duplicated rows can be created.
Also Spark does not support passing custom options to
insert statement, like ``ON CONFLICT``, so don't try to
implement deduplication using unique indexes or constraints.
Also Spark does not support passing custom options to
insert statement, like ``ON CONFLICT``, so don't try to
implement deduplication using unique indexes or constraints.
Instead, write to staging table and perform deduplication
using :obj:`~execute` method.
Instead, write to staging table and perform deduplication
using :obj:`~execute` method.
* ``replace_entire_table``
**Table is dropped and then created, or truncated**.
.. dropdown:: Behavior in details
* Table does not exist
Table is created using options provided by user
(``createTableOptions``, ``createTableColumnTypes``, etc).
* Table does not exist
Table is created using options provided by user
(``createTableOptions``, ``createTableColumnTypes``, etc).
* Table exists
Table content is replaced with dataframe content.
* Table exists
Table content is replaced with dataframe content.
After writing completed, target table could either have the same DDL as
before writing data (``truncate=True``), or can be recreated (``truncate=False``
or source does not support truncation).
After writing completed, target table could either have the same DDL as
before writing data (``truncate=True``), or can be recreated (``truncate=False``
or source does not support truncation).
.. note::
* ``ignore``
Ignores the write operation if the table already exists.
.. dropdown:: Behavior in details
* Table does not exist
Table is created using options provided by user
(``createTableOptions``, ``createTableColumnTypes``, etc).
* Table exists
The write operation is ignored, and no data is written to the table.
* ``error``
Raises an error if the table already exists.
.. dropdown:: Behavior in details
* Table does not exist
Table is created using options provided by user
(``createTableOptions``, ``createTableColumnTypes``, etc).
* Table exists
An error is raised, and no data is written to the table.
``error`` and ``ignore`` modes are not supported.
"""

batchsize: int = 20_000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@
pytestmark = pytest.mark.postgres


def test_postgres_writer_snapshot(spark, processing, prepare_schema_table):
@pytest.mark.parametrize(
"options",
[
{},
{"if_exists": "append"},
{"if_exists": "replace_entire_table"},
{"if_exists": "error"},
{"if_exists": "ignore"},
],
)
def test_postgres_writer_snapshot(spark, processing, get_schema_table, options):
df = processing.create_spark_df(spark=spark)

postgres = Postgres(
Expand All @@ -20,14 +30,15 @@ def test_postgres_writer_snapshot(spark, processing, prepare_schema_table):

writer = DBWriter(
connection=postgres,
target=prepare_schema_table.full_name,
target=get_schema_table.full_name,
options=Postgres.WriteOptions(**options),
)

writer.run(df)

processing.assert_equal_df(
schema=prepare_schema_table.schema,
table=prepare_schema_table.table,
schema=get_schema_table.schema,
table=get_schema_table.table,
df=df,
)

Expand Down Expand Up @@ -86,7 +97,7 @@ def test_postgres_writer_snapshot_with_pydantic_options(spark, processing, prepa
)


def test_postgres_writer_mode_append(spark, processing, prepare_schema_table):
def test_postgres_writer_if_exists_append(spark, processing, prepare_schema_table):
df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500)
df1 = df[df.id_int < 1001]
df2 = df[df.id_int > 1000]
Expand Down Expand Up @@ -116,7 +127,70 @@ def test_postgres_writer_mode_append(spark, processing, prepare_schema_table):
)


def test_postgres_writer_mode_replace_entire_table(spark, processing, prepare_schema_table):
def test_postgres_writer_if_exists_error(spark, processing, prepare_schema_table):
from pyspark.sql.utils import AnalysisException

df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500)

postgres = Postgres(
host=processing.host,
port=processing.port,
user=processing.user,
password=processing.password,
database=processing.database,
spark=spark,
)

writer = DBWriter(
connection=postgres,
target=prepare_schema_table.full_name,
options=Postgres.WriteOptions(if_exists="error"),
)

with pytest.raises(
AnalysisException,
match=f"Table or view '{prepare_schema_table.full_name}' already exists. SaveMode: ErrorIfExists.",
):
writer.run(df)

empty_df = spark.createDataFrame([], df.schema)

processing.assert_equal_df(
schema=prepare_schema_table.schema,
table=prepare_schema_table.table,
df=empty_df,
)


def test_postgres_writer_if_exists_ignore(spark, processing, prepare_schema_table):
df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500)

postgres = Postgres(
host=processing.host,
port=processing.port,
user=processing.user,
password=processing.password,
database=processing.database,
spark=spark,
)

writer = DBWriter(
connection=postgres,
target=prepare_schema_table.full_name,
options=Postgres.WriteOptions(if_exists="ignore"),
)

writer.run(df) # The write operation is ignored
empty_df = spark.createDataFrame([], df.schema)

processing.assert_equal_df(
schema=prepare_schema_table.schema,
table=prepare_schema_table.table,
df=empty_df,
)


def test_postgres_writer_if_exists_replace_entire_table(spark, processing, prepare_schema_table):
df = processing.create_spark_df(spark=spark, min_id=1, max_id=1500)
df1 = df[df.id_int < 1001]
df2 = df[df.id_int > 1000]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ def test_jdbc_write_options_to_jdbc(spark_mock):
[
({}, JDBCTableExistBehavior.APPEND),
({"if_exists": "append"}, JDBCTableExistBehavior.APPEND),
({"if_exists": "ignore"}, JDBCTableExistBehavior.IGNORE),
({"if_exists": "error"}, JDBCTableExistBehavior.ERROR),
({"if_exists": "replace_entire_table"}, JDBCTableExistBehavior.REPLACE_ENTIRE_TABLE),
],
)
Expand Down Expand Up @@ -294,6 +296,18 @@ def test_jdbc_write_options_if_exists(options, value):
"Mode `overwrite` is deprecated since v0.9.0 and will be removed in v1.0.0. "
"Use `replace_entire_table` instead",
),
(
{"mode": "ignore"},
JDBCTableExistBehavior.IGNORE,
"Option `WriteOptions(mode=...)` is deprecated since v0.9.0 and will be removed in v1.0.0. "
"Use `WriteOptions(if_exists=...)` instead",
),
(
{"mode": "error"},
JDBCTableExistBehavior.ERROR,
"Option `WriteOptions(mode=...)` is deprecated since v0.9.0 and will be removed in v1.0.0. "
"Use `WriteOptions(if_exists=...)` instead",
),
],
)
def test_jdbc_write_options_mode_deprecated(options, value, message):
Expand All @@ -305,10 +319,6 @@ def test_jdbc_write_options_mode_deprecated(options, value, message):
@pytest.mark.parametrize(
"options",
[
# disallowed modes
{"mode": "error"},
{"mode": "ignore"},
# wrong mode
{"mode": "wrong_mode"},
],
)
Expand Down

0 comments on commit e205d8e

Please sign in to comment.