Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement remaining Update_Actions for update_database_table. #7035

Merged
merged 11 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@
- [Speed improvements to `Column` `.truncate`, `.ceil`, and `.floor`.][6941]
- [Implemented addition and subtraction for `Date_Period` and
`Time_Period`.][6956]
- [Implemented `Table.update_database_table`.][7035]

[debug-shortcuts]:
https://github.com/enso-org/enso/blob/develop/app/gui/docs/product/shortcuts.md#debug
Expand Down Expand Up @@ -704,6 +705,7 @@
[6925]: https://github.com/enso-org/enso/pull/6925
[6941]: https://github.com/enso-org/enso/pull/6941
[6956]: https://github.com/enso-org/enso/pull/6956
[7035]: https://github.com/enso-org/enso/pull/7035

#### Enso Compiler

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Builder
join separator statements =
sep = case separator of
Builder.Value _ -> separator
_ -> Builder.code separator
_ : Text -> Builder.code separator

if statements.length == 0 then Builder.empty else
(1.up_to statements.length . fold (statements.at 0) acc-> i-> acc ++ sep ++ statements.at i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ type Unmatched_Rows
## PRIVATE
Indicates that the `Update` operation encountered input rows that did not
have any matching rows in the target table.
Error
Error (count : Integer)

## PRIVATE
Pretty print the rows already present error.
to_display_text : Text
to_display_text self =
"The `Update` operation encountered input rows that did not have any matching rows in the target table. The operation has been rolled back. Consider `Update_Or_Insert` if you want to insert rows that do not match any existing rows."
"The `Update` operation encountered " + self.count.to_text + " input rows that did not have any matching rows in the target table. The operation has been rolled back. Consider `Update_Or_Insert` if you want to insert rows that do not match any existing rows."

type Rows_Already_Present
## PRIVATE
Expand All @@ -204,10 +204,10 @@ type Multiple_Target_Rows_Matched_For_Update
## PRIVATE
Indicates that the source table had rows matching multiple rows in the
target table by the specified key.
Error
Error (example_key : Vector Any) (example_count : Integer)

## PRIVATE
Pretty print the multiple target rows matched for update error.
to_display_text : Text
to_display_text self =
"The update operation encountered input rows that matched multiple rows in the target table. The operation has been rolled back. You may need to use a more specific key for matching."
"The update operation encountered input rows that matched multiple rows in the target table (for example, the key " + self.example_key.to_display_text + " matched " + self.example_count.to_text + " rows). The operation has been rolled back. You may need to use a more specific key for matching."
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,27 @@ generate_query dialect query = case query of
columns_part = Builder.join ", " (column_names.map dialect.wrap_identifier) . paren
Builder.code "INSERT INTO " ++ dialect.wrap_identifier table_name ++ " " ++ columns_part ++ " " ++ inner_query
_ -> Panic.throw (Illegal_State.Error "The inner query of `Query.Insert_From_Select` must be `Query.Select`, but it was "+select_query.to_display_text+". This is a bug in the Database library.")
Query.Update_From_Table target_table_name source_table_name column_names key_columns ->
target = dialect.wrap_identifier target_table_name
source = dialect.wrap_identifier source_table_name
if column_names.is_empty then
Panic.throw (Illegal_State.Error "Invalid IR: `Query.Update_From_Table` must have at least one column to update. This is a bug in the Database library.")
if key_columns.is_empty then
Panic.throw (Illegal_State.Error "Invalid IR: `Query.Update_From_Table` must have at least one key column. This is a bug in the Database library.")
set_part = Builder.join ", " <| column_names.map column_name->
id = dialect.wrap_identifier column_name
id ++ "=" ++ source ++ "." ++ id
key_part = Builder.join " AND " <| key_columns.map column_name->
id = dialect.wrap_identifier column_name
target ++ "." ++ id ++ "=" ++ source ++ "." ++ id
Builder.code "UPDATE " ++ target ++ " SET " ++ set_part ++ " FROM " ++ source ++ " WHERE " ++ key_part
Query.Delete_Unmatched_Rows target_table_name source_table_name key_columns ->
target = dialect.wrap_identifier target_table_name
source = dialect.wrap_identifier source_table_name
if key_columns.is_empty then
Panic.throw (Illegal_State.Error "Invalid IR: `Query.Delete_Unmatched_Rows` must have at least one key column. This is a bug in the Database library.")
key_part = Builder.join ", " (key_columns.map dialect.wrap_identifier)
Builder.code "DELETE FROM " ++ target ++ " WHERE (" ++ key_part ++ ") NOT IN (SELECT " ++ key_part ++ " FROM " ++ source ++ ")"
_ -> Error.throw <| Unsupported_Database_Operation.Error "Unsupported query type: "+query.to_text

## PRIVATE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,22 @@ type Query
The columns in that query should correspond to the columns in specified
in `column_names`, matching by position.
Insert_From_Select (table_name:Text) (column_names : Vector Text) (select:Query)

## PRIVATE
An SQL UPDATE query that updates rows in the target table with values
from the source table when the key columns match. Target table rows that
do not match the source are left unaffected.

This will usually be a query of the form
`UPDATE target SET c1=source.c1, ... FROM source WHERE target.key1=source.key1 AND ...`
where `c1` comes from `column_names` and `key1` comes from `key_columns`.
Update_From_Table (target_table_name:Text) (source_table_name:Text) (column_names : Vector Text) (key_columns : Vector Text)

## PRIVATE
An SQL DELETE query that deletes from target table rows that are _not_
present in the source table, based on the specified key columns.
They key columns must be present under the same name in both tables.

This will usually be a query of the form
`DELETE FROM target WHERE (key_columns...) NOT IN (SELECT key_columns... FROM source)`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if a DELETE/JOIN would work better.

Add a constant marker to the source, and then something like:

DELETE target
  FROM target 
  LEFT JOIN source ON  ...
 WHERE source.marker is NULL

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image image

This kind of syntax seems to not be supported in neither of the DBs we currently have.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SQLite the DELETE syntax only allows for a WHERE clause, no joins are available.
image

In Postgres, I guess we may try to play with MERGE but that is for a later ticket, I think - so to be revisited in #7036

Delete_Unmatched_Rows (target_table_name:Text) (source_table_name:Text) (key_columns : Vector Text)
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from Standard.Base import all
from Standard.Base.Random import random_uuid
import Standard.Base.Errors.Illegal_Argument.Illegal_Argument
import Standard.Base.Errors.Illegal_State.Illegal_State

import Standard.Table.Data.Table.Table as In_Memory_Table
from Standard.Table import Aggregate_Column, Join_Kind, Value_Type, Column_Selector
Expand Down Expand Up @@ -227,36 +226,122 @@ common_update_table source_table connection table_name update_action key_columns
Error.throw new_error
if target_table.is_error then handle_error else
tmp_table_name = "temporary-source-table-"+random_uuid
tmp_table = internal_upload_table source_table connection tmp_table_name key_columns temporary=True on_problems=Problem_Behavior.Report_Error
tmp_table = internal_upload_table source_table connection tmp_table_name primary_key=key_columns temporary=True on_problems=Problem_Behavior.Report_Error
tmp_table.if_not_error <|
resulting_table = append_to_existing_table tmp_table target_table update_action key_columns error_on_missing_columns on_problems
connection.drop_table tmp_table.name
resulting_table

## PRIVATE
Assumes that `source_table` is a simple table query without any filters,
joins and other composite operations - if a complex query is needed, it
should be first materialized into a temporary table.
append_to_existing_table source_table target_table update_action key_columns error_on_missing_columns on_problems = In_Transaction.ensure_in_transaction <|
effective_key_columns = if key_columns.is_nothing then [] else key_columns
check_update_arguments source_table target_table effective_key_columns update_action error_on_missing_columns on_problems <|
throw_not_implemented =
Error.throw (Illegal_State.Error "Not implemented yet. Only `Insert` is implemented.")
helper = Append_Helper.Context source_table target_table effective_key_columns
upload_status = case update_action of
Update_Action.Insert ->
existing_rows_check = if effective_key_columns.is_empty then True else
joined = source_table.join target_table on=key_columns join_kind=Join_Kind.Inner
count = joined.row_count
if count == 0 then True else
Error.throw (Rows_Already_Present.Error count)
existing_rows_check.if_not_error <|
connection = target_table.connection
insert_statement = connection.dialect.generate_sql <|
Query.Insert_From_Select target_table.name source_table.column_names source_table.to_select_query
Panic.rethrow <| connection.execute_update insert_statement
Update_Action.Update -> throw_not_implemented
Update_Action.Update_Or_Insert -> throw_not_implemented
Update_Action.Align_Records -> throw_not_implemented
helper.check_already_existing_rows <|
helper.insert_rows source_table
Update_Action.Update ->
helper.check_rows_unmatched_in_target <|
helper.check_multiple_target_rows_match <|
helper.update_common_rows
Update_Action.Update_Or_Insert ->
helper.check_multiple_target_rows_match <|
helper.update_common_rows
helper.insert_rows helper.new_source_rows
Update_Action.Align_Records ->
helper.check_multiple_target_rows_match <|
helper.update_common_rows
helper.insert_rows helper.new_source_rows
helper.delete_unmatched_target_rows
upload_status.if_not_error target_table

## PRIVATE
type Append_Helper
## PRIVATE
Context source_table target_table key_columns

## PRIVATE
connection self = self.target_table.connection

## PRIVATE
The update only affects matched rows, unmatched rows are ignored.
update_common_rows self =
update_statement = self.connection.dialect.generate_sql <|
Query.Update_From_Table self.target_table.name self.source_table.name self.source_table.column_names self.key_columns
Panic.rethrow <| self.connection.execute_update update_statement

## PRIVATE
Inserts all rows from the source.

Behaviour is ill-defined if any of the rows already exist in the target.
If only new rows are supposed to be inserted, they have to be filtered
before inserting.
insert_rows self table_to_insert =
insert_statement = self.connection.dialect.generate_sql <|
Query.Insert_From_Select self.target_table.name table_to_insert.column_names table_to_insert.to_select_query
Panic.rethrow <| self.connection.execute_update insert_statement

## PRIVATE
Finds rows that are present in the source but not in the target.
new_source_rows self =
self.source_table.join self.target_table on=self.key_columns join_kind=Join_Kind.Left_Exclusive

## PRIVATE
Deletes rows from target table that were not present in the source.
delete_unmatched_target_rows self =
delete_statement = self.connection.dialect.generate_sql <|
Query.Delete_Unmatched_Rows self.target_table.name self.source_table.name self.key_columns
Panic.rethrow <| self.connection.execute_update delete_statement

## PRIVATE
Checks if any rows from the source table already exist in the target, and
if they do - raises an error.

Does nothing if `key_columns` is empty, as then there is no notion of
'matching' rows.
check_already_existing_rows self ~continuation =
if self.key_columns.is_empty then continuation else
joined = self.source_table.join self.target_table on=self.key_columns join_kind=Join_Kind.Inner
count = joined.row_count
if count == 0 then continuation else
Error.throw (Rows_Already_Present.Error count)

## PRIVATE
check_rows_unmatched_in_target self ~continuation =
# assert key_columns.not_empty
unmatched_rows = self.new_source_rows
count = unmatched_rows.row_count
if count != 0 then Error.throw (Unmatched_Rows.Error count) else continuation

## PRIVATE
Check if there are rows in source that match multiple rows in the target.
check_multiple_target_rows_match self ~continuation =
matched_rows = self.source_table.join self.target_table on=self.key_columns join_kind=Join_Kind.Inner
agg = (self.key_columns.map (Aggregate_Column.Group_By _)) + [Aggregate_Column.Count]
## This aggregation will only find duplicated in target, not in the source,
because the source is already guaranteed to be unique - that was checked
when uploading the temporary table with the key as its primary key.
duplicated_key_in_target = matched_rows.aggregate agg . filter -1 (Filter_Condition.Greater than=1)
example = duplicated_key_in_target.read max_rows=1
case example.row_count == 0 of
False ->
row = duplicated_key_in_target.first_row . to_vector
offending_key = row.drop (Index_Sub_Range.Last 1)
count = row.last
Error.throw (Multiple_Target_Rows_Matched_For_Update.Error offending_key count)
True -> continuation

## PRIVATE
This helper ensures that all arguments are valid.

The `action` is run only if the input invariants are satisfied:
- all columns in `source_table` have a corresponding column in `target_table`
(with the same name),
- all `key_columns` are present in both source and target tables.
check_update_arguments source_table target_table key_columns update_action error_on_missing_columns on_problems ~action =
check_source_column source_column =
# The column must exist because it was verified earlier.
Expand Down
3 changes: 1 addition & 2 deletions distribution/lib/Standard/Table/0.0.0-dev/src/Errors.enso
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,12 @@ type Unmatched_Columns
merged tables.
Error (column_names : Vector Text)

# TODO [RW] the message is wrong for update_database_table_case, needs updating
## PRIVATE

Create a human-readable version of the error.
to_display_text : Text
to_display_text self =
"The following columns were not present in some of the provided tables: " + (self.column_names.map (n -> "["+n+"]") . join ", ") + ". The missing values have been filled with `Nothing`."
"The following columns were not present in some of the provided tables: " + (self.column_names.map (n -> "["+n+"]") . join ", ") + "."

type Cross_Join_Row_Limit_Exceeded
## PRIVATE
Expand Down
Loading
Loading