Skip to content

Commit

Permalink
Implement Error-Handling for Database aggregations, unify some error …
Browse files Browse the repository at this point in the history
…helpers across backends (#3371)
  • Loading branch information
radeusgd committed Mar 31, 2022
1 parent 23e5216 commit 43265f1
Show file tree
Hide file tree
Showing 24 changed files with 1,011 additions and 759 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ from Standard.Base import all
import Standard.Base.Data.Locale
import Standard.Base.Data.Text.Regex
from Standard.Base.Error.Problem_Behavior as Problem_Behavior_Module import Problem_Behavior, Report_Warning
from Standard.Base.Error.Common import Wrapped_Dataflow_Error

## UNSTABLE
An error indicating that some criteria did not match any names in the input.
Expand Down Expand Up @@ -261,8 +262,25 @@ type Matcher
Regex_Matcher

## PRIVATE
match_criteria_implementation matcher objects criteria reorder=False name_mapper=(x->x) on_problems=Report_Warning = Panic.recover Any <|
[matcher, objects, criteria, reorder, name_mapper, on_problems] . each Panic.rethrow
match_criteria_implementation matcher objects criteria reorder=False name_mapper=(x->x) on_problems=Report_Warning =
result = here.internal_match_criteria_implementation matcher objects criteria reorder name_mapper
unmatched_criteria = result.second
problems = if unmatched_criteria.is_empty then [] else
[No_Matches_Found unmatched_criteria]
on_problems.attach_problems_after result.first problems

## PRIVATE
match_criteria_callback matcher objects criteria problem_callback reorder=False name_mapper=(x->x) =
result = here.internal_match_criteria_implementation matcher objects criteria reorder name_mapper
unmatched_criteria = result.second
problem_callback unmatched_criteria
result.first

## PRIVATE
internal_match_criteria_implementation matcher objects criteria reorder=False name_mapper=(x->x) = Panic.catch Wrapped_Dataflow_Error (handler = x-> x.payload.unwrap) <|
## TODO [RW] discuss: this line of code also shows an issue we had with ensuring input dataflow-errors are correctly propagated, later on we stopped doing that and testing for that as it was too cumbersome. Maybe it could be helped with an @Accepts_Error annotation similar to the one from the interpreter???
[matcher, objects, criteria, reorder, name_mapper] . each v->
Panic.rethrow (v.map_error Wrapped_Dataflow_Error)

# match_matrix . at i . at j specifies whether objects.at i matches criteria.at j
match_matrix = objects.map obj->
Expand Down Expand Up @@ -302,6 +320,4 @@ match_criteria_implementation matcher objects criteria reorder=False name_mapper
select_matching_indices is_object_matched_by_anything

result = selected_indices.map objects.at
problems = if unmatched_criteria.is_empty then [] else
[No_Matches_Found unmatched_criteria]
on_problems.attach_problems_after result problems
Pair result unmatched_criteria
9 changes: 4 additions & 5 deletions distribution/lib/Standard/Base/0.0.0-dev/src/Data/Vector.enso
Original file line number Diff line number Diff line change
Expand Up @@ -1014,14 +1014,13 @@ type Builder
capacity = this.to_array.length

## Checks if this builder is empty.

> Example
Checking for emptiness.

[].is_empty
is_empty : Boolean
is_empty = this.length == 0

## Checks if this builder is not empty.
not_empty : Boolean
not_empty = this.is_empty.not

## Appends a new element into this builder and returns it, propagating any
errors that the provided element could have contained.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,12 @@ type Illegal_Argument_Error
- message: the error message explaining why the argument is illegal.
- cause: (optional) another error that is the cause of this one.
type Illegal_Argument_Error message cause=Nothing

## PRIVATE
Wraps a dataflow error lifted to a panic, making possible to distinguish it
from other panics.
type Wrapped_Dataflow_Error payload

## PRIVATE
Throws the original error.
Wrapped_Dataflow_Error.unwrap = Error.throw this.payload
52 changes: 52 additions & 0 deletions distribution/lib/Standard/Base/0.0.0-dev/src/Warning.enso
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,55 @@ map_attached_warnings_helper mapper value frames_to_drop =
original (unwrapped) warning instance.
Nothing -> warning.prim_warning
Prim_Warning.set value prim_mapped_warnings.to_array

## UNSTABLE
A helper function which selects warnings matching a predicate and returns a
pair whose first element is the original value with the matched warnings
removed and the second element is the list of matched warnings.

Arguments:
- value: the value whose warnings are to be filtered.
- predicate: a predicate specifying which warnings to detach. The predicate
receives the warnings' payloads as its argument.

> Example
Detach warnings of a specific type.

result = Warning.detach_selected_warnings value (_.is_a Illegal_State_Error)
result.first # `value` with the matched warnings removed
result.second # the list of matched warnings
detach_selected_warnings : Any -> (Any -> Boolean) -> Pair Any Vector
detach_selected_warnings value predicate =
warnings = here.get_all value
result = warnings.partition w-> predicate w.value
matched = result.first
remaining = result.second
Pair (here.set remaining value) matched

## UNSTABLE
A helper function which gathers warnings matching some predicate and passes
them into a function which can aggregate them.

The merger function will not be called at all if no warnings match the
criteria.

Arguments:
- value: the value whose warnings will be transformed.
- matcher: a predicate selecting warnings to merge.
- merger: a function taking a list of found payloads which should return a
list of new warnings that should be attached. It returns a list to not
limit it to merging warnings into a single warning.

> Example
Merge `No_Matches_Found` warnings into a single such warning.

Warning.merge_matched_warnings value (_.is_a No_Matches_Found) warnings->
all_criteria = warnings.flat_map .criteria
[No_Matches_Found all_criteria]
merge_matched_warnings : Any -> (Any -> Boolean) -> (Vector -> Vector) -> Any
merge_matched_warnings value matcher merger =
result = here.detach_selected_warnings value matcher
if result.second.is_empty then result.first else
new_warnings = merger (result.second.map .value)
new_warnings.fold result.first acc-> warning->
Warning.attach warning acc
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,24 @@ type Connection
It creates a new table in the database with the given name (will fail if
the table already existed), inserts the contents of the provided
in-memory table and returns a handle to the newly created table.
upload_table : Text -> Materialized_Table -> Integer -> Database_Table
upload_table name table batch_size=1000 = Panic.recover Illegal_State_Error <| here.handle_sql_errors <|

Arguments:
- name: The name of the table to create.
- table: An In-Memory table specifying the contents to upload. Schema of
the created database table is based on the column types of this table.
- temporary: Specifies whether the table should be marked as temporary. A
temporary table will be dropped after the connection closes and will
usually not be visible to other connections.
- batch_size: Specifies how many rows should be uploaded in a single
batch.
upload_table : Text -> Materialized_Table -> Boolean -> Integer -> Database_Table
upload_table name table temporary=True batch_size=1000 = Panic.recover Illegal_State_Error <| here.handle_sql_errors <|
column_types = table.columns.map col-> here.default_storage_type col.storage_type
column_names = table.columns.map .name
col_makers = column_names.zip column_types name-> typ->
Base_Generator.wrap_in_quotes name ++ Sql.code " " ++ Sql.code typ.name
create_sql = (Sql.code "CREATE TABLE " ++ Base_Generator.wrap_in_quotes name ++ Sql.code " (" ++ (Sql.join ", " col_makers) ++ Sql.code ")").build
create_prefix = if temporary then "CREATE TEMPORARY TABLE " else "CREATE TABLE "
create_sql = (Sql.code create_prefix ++ Base_Generator.wrap_in_quotes name ++ Sql.code " (" ++ (Sql.join ", " col_makers) ++ Sql.code ")").build
Panic.rethrow <| this.execute_update create_sql
db_table = Panic.rethrow <| this.access_table name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ from Standard.Table.Data.Aggregate_Column import all
import Standard.Database.Data.Internal.IR
from Standard.Database.Data.Sql import Sql_Type

make_aggregate_column : Table -> Aggregate_Column -> IR.Internal_Column
make_aggregate_column table aggregate =
new_name = aggregate.column_name table
make_aggregate_column : Table -> Aggregate_Column -> Text -> IR.Internal_Column
make_aggregate_column table aggregate new_name =
sql_type = table.connection.dialect.resolve_target_sql_type aggregate
expression = here.make_expression aggregate
IR.Internal_Column new_name sql_type expression
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from Standard.Base import all

import Standard.Database.Data.Internal.Vector_Builder
import Standard.Table.Internal.Vector_Builder

polyglot java import java.sql.Types

Expand Down
34 changes: 18 additions & 16 deletions distribution/lib/Standard/Database/0.0.0-dev/src/Data/Table.enso
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Standard.Table.Internal.Java_Exports
import Standard.Table.Internal.Table_Helpers

import Standard.Table.Data.Aggregate_Column
import Standard.Table.Internal.Aggregate_Column_Helper
from Standard.Database.Data.Column as Column_Module import Column, Aggregate_Column_Builder
from Standard.Database.Data.Internal.IR import Internal_Column
from Standard.Table.Data.Table import No_Such_Column_Error
Expand Down Expand Up @@ -644,19 +645,17 @@ type Table
## Prototype Group By function
aggregate : [Aggregate_Column] -> Problem_Behavior -> Table
aggregate columns (on_problems=Report_Warning) =
## TODO handle errors here and turn them into warnings where applicable
_ = on_problems
resolved_aggregates = columns.map (_.resolve_columns this)
# TODO handling duplicate names etc. is to be done as part of https://www.pivotaltracker.com/story/show/181420794
# Grouping Key
is_a_key c = case c of
Aggregate_Column.Group_By _ _ -> True
_ -> False
key_columns = resolved_aggregates.filter is_a_key . map .column
key_expressions = key_columns.map .expression
new_ctx = this.context.set_groups key_expressions
new_columns = resolved_aggregates.map (Aggregate_Helper.make_aggregate_column this)
this.updated_context_and_columns new_ctx new_columns
validated = Aggregate_Column_Helper.prepare_aggregate_columns columns this
on_problems.attach_problems_before validated.problems <|
key_columns = validated.key_columns
resolved_aggregates = validated.valid_columns
key_expressions = key_columns.map .expression
new_ctx = this.context.set_groups key_expressions
new_columns = resolved_aggregates.map p->
agg = p.second
new_name = p.first
Aggregate_Helper.make_aggregate_column this agg new_name
this.updated_context_and_columns new_ctx new_columns

## UNSTABLE

Expand Down Expand Up @@ -688,11 +687,14 @@ type Table

## Returns the amount of rows in this table.
row_count : Integer
row_count =
row_count = if this.internal_columns.is_empty then 0 else
expr = IR.Operation "COUNT_ROWS" []
column_name = "row_count"
virtual_column = IR.Internal_Column "1" Sql.Sql_Type.integer (IR.Constant Sql.Sql_Type.integer 1)
setup = this.context.as_subquery this.name [[virtual_column]]
## We need to keep some column in the subquery which will determine if
the query is performing regular selection or aggregation. To avoid
computing too much we do not pass all the columns but only the first
one.
setup = this.context.as_subquery this.name [[this.internal_columns.first]]
new_ctx = IR.subquery_as_ctx setup.first
query = IR.Select [[column_name, expr]] new_ctx
sql = this.connection.dialect.generate_sql query
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from Standard.Base import all

from Standard.Table.Data.Column as Column_Module import Column
from Standard.Table.Data.Column_Selector as Column_Selector_Module import Column_Selector, By_Name, By_Index, By_Column
import Standard.Table.Internal.Table_Helpers
import Standard.Base.Error.Problem_Behavior

## Defines an Aggregate Column
type Aggregate_Column
Expand Down Expand Up @@ -173,89 +170,6 @@ type Aggregate_Column
- name: name of new column.
type Longest (column:Column|Text|Integer) (new_name:Text|Nothing=Nothing)

## Gets a column name to use for the aggregate column
column_name : Table->Text
column_name table =
if this.new_name.is_nothing.not then this.new_name else
get_name c = (this.resolve_column table c).name
case this of
Group_By c _ -> get_name c
Count _ -> "Count"
Count_Distinct columns _ _ ->
as_vector = case columns of
Vector.Vector _ -> columns
_ -> [columns]
"Count Distinct " + (as_vector.map get_name . join " ")
Percentile p c _ -> ((p*100).floor.to_text + "%-ile ") + get_name c
_ ->
prefix = Meta.get_simple_type_name this . replace "_" " "
prefix + " " + get_name this.column

## PRIVATE
Given a column reference resolve to the underlying column
resolve_column : Table->(Column|Text|Integer)->Column
resolve_column table column =
## TODO this should be able to handle problems too!
case column of
Text -> table.at column
Integer -> table.columns.at column
## A wildcard makes this work both with In-Memory and Database table columns.
_ -> table.at (column.name)

## PRIVATE
Returns a copy of this aggregate where all column descriptors (names,
indices or column references potentially from a different table) are
replaced with column references from the provided table.

This preprocess step is required by some helper function, to avoid having
to pass the table reference and resolve the column descriptors all the
time.

If some columns cannot be resolved, a dataflow error will be returned.
Higher-level methods can then handle this error by turning it into a
warning and ignoring the column.
resolve_columns : Table -> Aggregate_Column
resolve_columns table =
resolve : (Integer|Text|Column) -> Column
resolve c = this.resolve_column table c
resolve_selector_to_vector : Column_Selector -> [Column]
resolve_selector_to_vector selector =
Table_Helpers.select_columns_helper table.columns selector reorder=False on_problems=Problem_Behavior.Report_Error
resolve_selector_or_nothing selector = case selector of
Nothing -> Nothing
_ -> resolve_selector_to_vector selector
case this of
Group_By c new_name -> Group_By (resolve c) new_name
Count new_name -> Count new_name
Count_Distinct c new_name ignore_nothing ->
new_c = case c of
## TODO once we have sum type pattern matching this could be replaced with a single branch
By_Name _ _ -> resolve_selector_to_vector c
By_Index _ -> resolve_selector_to_vector c
By_Column _ -> resolve_selector_to_vector c
## TODO this is a temporary fix, remove it
Vector.Vector _ -> c.map resolve
_ -> [resolve c]
Count_Distinct new_c new_name ignore_nothing
Count_Not_Nothing c new_name -> Count_Not_Nothing (resolve c) new_name
Count_Nothing c new_name -> Count_Nothing (resolve c) new_name
Count_Not_Empty c new_name -> Count_Not_Empty (resolve c) new_name
Count_Empty c new_name -> Count_Empty (resolve c) new_name
Sum c new_name -> Sum (resolve c) new_name
Average c new_name -> Average (resolve c) new_name
Median c new_name -> Median (resolve c) new_name
Percentile p c new_name -> Percentile p (resolve c) new_name
Mode c new_name -> Mode (resolve c) new_name
Standard_Deviation c new_name population -> Standard_Deviation (resolve c) new_name population
Concatenate c new_name separator prefix suffix quote_char -> Concatenate (resolve c) new_name separator prefix suffix quote_char
First c new_name ignore_nothing order_by -> First (resolve c) new_name ignore_nothing (resolve_selector_or_nothing order_by)
Last c new_name ignore_nothing order_by -> Last (resolve c) new_name ignore_nothing (resolve_selector_or_nothing order_by)
Maximum c new_name -> Maximum (resolve c) new_name
Minimum c new_name -> Minimum (resolve c) new_name
Shortest c new_name -> Shortest (resolve c) new_name
Longest c new_name -> Longest (resolve c) new_name


## Occurs when cannot aggregate a column
type Invalid_Aggregation_Method (column : Text) (message : Text)

Expand Down
4 changes: 2 additions & 2 deletions distribution/lib/Standard/Table/0.0.0-dev/src/Data/Table.enso
Original file line number Diff line number Diff line change
Expand Up @@ -490,13 +490,13 @@ type Table
## Prototype Group By function
aggregate : [Aggregate_Column] -> Problem_Behavior -> Table
aggregate columns (on_problems=Report_Warning) =
validated = Aggregate_Column_Helper.validate columns this
validated = Aggregate_Column_Helper.prepare_aggregate_columns columns this

make_key = if (validated.key_columns.length == 0) then _->(Group_By_Key.key [1]) else
i->(Group_By_Key.key (validated.key_columns.map v->(v.at i)))

new_table = validated.valid_columns.map c->[c.first, Vector.new_builder]
aggregators = validated.valid_columns.map c->(Aggregate_Column_Aggregator.new this c.second)
aggregators = validated.valid_columns.map c->(Aggregate_Column_Aggregator.new c.second)
add_row _ =
idx = new_table.at 0 . at 1 . length
0.up_to (aggregators.length) . each i->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ calculate_percentile p:Decimal value:Map =
[new_v, new_s, new_t]
(output.second + (output.at 2 - output.second) * (mid_value - mid))

## Given a Table and a Column create an aggregator
new : Table->Aggregate_Column->Aggregate_Column_Aggregator
new table column =
create_closure c function:(Column->Any->Integer->Any) = function (column.resolve_column table c)
## PRIVATE
Creates an aggregator from a resolved `Aggregate_Column`.

You may need to transform the column with `resolve_columns` first to make
sure that it is resolved.
new : Aggregate_Column->Aggregate_Column_Aggregator
new column =
## This can be removed completely, but it is being removed by other PR so I'm not touching it too much.
create_closure c function:(Column->Any->Integer->Any) = function c

is_empty s = if s.is_nothing then True else case s of
Text -> s.is_empty
Expand All @@ -42,10 +47,7 @@ new table column =
Count _ ->
create_aggregator initial=0 accumulator=(c->_->c+1)
Count_Distinct columns _ ignore_nothing ->
resolved = case columns of
Vector.Vector _ -> columns.map c->(column.resolve_column table c)
_ -> [column.resolve_column table columns]
key_maker i = Group_By_Key.key (resolved.map c->(c.at i))
key_maker i = Group_By_Key.key (columns.map c->(c.at i))
accumulator = case ignore_nothing of
False-> map->i->(map.insert (key_maker i) 1)
True-> map->i->
Expand Down
Loading

0 comments on commit 43265f1

Please sign in to comment.