From 00dbf3e125ba7885dae59a0cfd2256978a186589 Mon Sep 17 00:00:00 2001 From: Anton Date: Tue, 25 Sep 2018 16:03:10 +0300 Subject: [PATCH] Implementation of over, partition by and window (#2618) --- integration_test/cases/windows.exs | 48 +++++++ lib/ecto/adapters/mysql/connection.ex | 47 ++++++- lib/ecto/adapters/postgres/connection.ex | 41 +++++- lib/ecto/query.ex | 26 +++- lib/ecto/query/api.ex | 162 +++++++++++++++++++++++ lib/ecto/query/builder.ex | 76 +++++++++++ lib/ecto/query/builder/windows.ex | 98 ++++++++++++++ lib/ecto/query/inspect.ex | 40 +++++- lib/ecto/query/planner.ex | 53 +++++++- test/ecto/adapters/mysql_test.exs | 106 +++++++++++++++ test/ecto/adapters/postgres_test.exs | 106 +++++++++++++++ test/ecto/query/builder/windows_test.exs | 39 ++++++ test/ecto/query/builder_test.exs | 19 +++ test/ecto/query/inspect_test.exs | 28 ++++ 14 files changed, 877 insertions(+), 12 deletions(-) create mode 100644 integration_test/cases/windows.exs create mode 100644 lib/ecto/query/builder/windows.ex create mode 100644 test/ecto/query/builder/windows_test.exs diff --git a/integration_test/cases/windows.exs b/integration_test/cases/windows.exs new file mode 100644 index 0000000000..56d4ae9be9 --- /dev/null +++ b/integration_test/cases/windows.exs @@ -0,0 +1,48 @@ +defmodule Ecto.Integration.WindowsTest do + use Ecto.Integration.Case, async: Application.get_env(:ecto, :async_integration_tests, true) + + alias Ecto.Integration.TestRepo + import Ecto.Query + + alias Ecto.Integration.Comment + alias Ecto.Integration.User + + test "count over partition" do + u1 = TestRepo.insert!(%User{name: "Tester"}) + u2 = TestRepo.insert!(%User{name: "Developer"}) + c1 = TestRepo.insert!(%Comment{text: "1", author_id: u1.id}) + c2 = TestRepo.insert!(%Comment{text: "2", author_id: u1.id}) + c3 = TestRepo.insert!(%Comment{text: "3", author_id: u1.id}) + c4 = TestRepo.insert!(%Comment{text: "4", author_id: u2.id}) + + query = from(c in Comment, select: [c, count(c.id) |> over(partition_by(c.author_id))]) + + assert [[^c1, 3], [^c2, 3], [^c3, 3], [^c4, 1]] = TestRepo.all(query) + end + + test "last 2 of each author" do + u1 = TestRepo.insert!(%User{name: "Tester"}) + u2 = TestRepo.insert!(%User{name: "Developer"}) + TestRepo.insert!(%Comment{text: "1", author_id: u1.id}) + TestRepo.insert!(%Comment{text: "2", author_id: u1.id}) + TestRepo.insert!(%Comment{text: "3", author_id: u1.id}) + TestRepo.insert!(%Comment{text: "4", author_id: u2.id}) + + subquery = from(c in Comment, + windows: [rw: partition_by(c.author_id, order_by: :id)], + select: %{ + comment: c.text, + row: row_number() |> over(:rw), + total: count(c.id) |> over(partition_by(c.author_id)) + }, + where: c.author_id in [^u1.id, ^u2.id] + ) + + query = from(r in subquery(subquery), + select: r.comment, + where: (r.total - r.row) < 2 + ) + + assert ["2", "3", "4"] = TestRepo.all(query) + end +end diff --git a/lib/ecto/adapters/mysql/connection.ex b/lib/ecto/adapters/mysql/connection.ex index ada4869807..a778251219 100644 --- a/lib/ecto/adapters/mysql/connection.ex +++ b/lib/ecto/adapters/mysql/connection.ex @@ -86,12 +86,13 @@ if Code.ensure_loaded?(Mariaex) do where = where(query, sources) group_by = group_by(query, sources) having = having(query, sources) + window = window(query, sources) order_by = order_by(query, sources) limit = limit(query, sources) offset = offset(query, sources) lock = lock(query.lock) - [select, from, join, where, group_by, having, order_by, limit, offset | lock] + [select, from, join, where, group_by, having, window, order_by, limit, offset | lock] end def update_all(query, prefix \\ nil) do @@ -324,13 +325,34 @@ if Code.ensure_loaded?(Mariaex) do end)] end + defp window(%Query{windows: []}, _sources), do: [] + defp window(%Query{windows: windows} = query, sources) do + [" WINDOW " | + intersperse_map(windows, ", ", fn + {name, definition} -> + [quote_name(name), " AS ", partition_by(definition, sources, query)] end)] + end + + defp partition_by(%QueryExpr{expr: opts}, sources, %Query{} = query) do + fields = Keyword.get(opts, :fields) + order_bys = Keyword.get_values(opts, :order_by) |> Enum.concat + fields = fields |> intersperse_map(", ", &expr(&1, sources, query)) + ["(PARTITION BY ", + fields, + order_by(order_bys, query, sources), + ?)] + end + defp order_by(%Query{order_bys: []}, _sources), do: [] defp order_by(%Query{order_bys: order_bys} = query, sources) do + order_bys = Enum.flat_map(order_bys, & &1.expr) + order_by(order_bys, query, sources) + end + + defp order_by([], _query, _sources), do: [] + defp order_by(order_bys, query, sources) do [" ORDER BY " | - intersperse_map(order_bys, ", ", fn - %QueryExpr{expr: expr} -> - intersperse_map(expr, ", ", &order_by_expr(&1, sources, query)) - end)] + intersperse_map(order_bys, ", ", &order_by_expr(&1, sources, query))] end defp order_by_expr({dir, expr}, sources, query) do @@ -461,6 +483,21 @@ if Code.ensure_loaded?(Mariaex) do error!(query, "ilike is not supported by MySQL") end + defp expr({:over, _, [agg, %QueryExpr{} = window]}, sources, query) do + aggregate = expr(agg, sources, query) + [aggregate, " OVER ", partition_by(window, sources, query)] + end + + defp expr({:over, _, [agg, nil]}, sources, query) do + aggregate = expr(agg, sources, query) + [aggregate, " OVER ()"] + end + + defp expr({:over, _, [agg, name]}, sources, query) when is_atom(name) do + aggregate = expr(agg, sources, query) + [aggregate, " OVER ", quote_name(name)] + end + defp expr({:{}, _, elems}, sources, query) do [?(, intersperse_map(elems, ?,, &expr(&1, sources, query)), ?)] end diff --git a/lib/ecto/adapters/postgres/connection.ex b/lib/ecto/adapters/postgres/connection.ex index d305925529..88d5afeddc 100644 --- a/lib/ecto/adapters/postgres/connection.ex +++ b/lib/ecto/adapters/postgres/connection.ex @@ -103,12 +103,13 @@ if Code.ensure_loaded?(Postgrex) do where = where(query, sources) group_by = group_by(query, sources) having = having(query, sources) + window = window(query, sources) order_by = order_by(query, order_by_distinct, sources) limit = limit(query, sources) offset = offset(query, sources) lock = lock(query.lock) - [select, from, join, where, group_by, having, order_by, limit, offset | lock] + [select, from, join, where, group_by, having, window, order_by, limit, offset | lock] end def update_all(%{from: %{source: source}} = query, prefix \\ nil) do @@ -364,9 +365,32 @@ if Code.ensure_loaded?(Postgrex) do end)] end + defp window(%Query{windows: []}, _sources), do: [] + defp window(%Query{windows: windows} = query, sources) do + [" WINDOW " | + intersperse_map(windows, ", ", fn + {name, definition} -> + [quote_name(name), " AS ", partition_by(definition, sources, query)] end)] + end + + defp partition_by(%QueryExpr{expr: opts}, sources, %Query{} = query) do + fields = Keyword.get(opts, :fields) + order_bys = Keyword.get_values(opts, :order_by) |> Enum.concat + fields = fields |> intersperse_map(", ", &expr(&1, sources, query)) + ["(PARTITION BY ", + fields, + order_by(order_bys, query, [], sources), + ?)] + end + defp order_by(%Query{order_bys: []}, _distinct, _sources), do: [] defp order_by(%Query{order_bys: order_bys} = query, distinct, sources) do order_bys = Enum.flat_map(order_bys, & &1.expr) + order_by(order_bys, query, distinct, sources) + end + + defp order_by([], _query, _distinct, _sources), do: [] + defp order_by(order_bys, query, distinct, sources) do [" ORDER BY " | intersperse_map(distinct ++ order_bys, ", ", &order_by_expr(&1, sources, query))] end @@ -492,6 +516,21 @@ if Code.ensure_loaded?(Postgrex) do [aggregate, " FILTER (WHERE ", expr(filter, sources, query), ?)] end + defp expr({:over, _, [agg, %QueryExpr{} = window]}, sources, query) do + aggregate = expr(agg, sources, query) + [aggregate, " OVER ", partition_by(window, sources, query)] + end + + defp expr({:over, _, [agg, nil]}, sources, query) do + aggregate = expr(agg, sources, query) + [aggregate, " OVER ()"] + end + + defp expr({:over, _, [agg, name]}, sources, query) when is_atom(name) do + aggregate = expr(agg, sources, query) + [aggregate, " OVER ", quote_name(name)] + end + defp expr({:{}, _, elems}, sources, query) do [?(, intersperse_map(elems, ?,, &expr(&1, sources, query)), ?)] end diff --git a/lib/ecto/query.ex b/lib/ecto/query.ex index deb39a3e9e..2b0c3295f1 100644 --- a/lib/ecto/query.ex +++ b/lib/ecto/query.ex @@ -354,7 +354,7 @@ defmodule Ecto.Query do defstruct [prefix: nil, sources: nil, from: nil, joins: [], aliases: %{}, wheres: [], select: nil, order_bys: [], limit: nil, offset: nil, group_bys: [], updates: [], - havings: [], preloads: [], assocs: [], distinct: nil, lock: nil] + havings: [], preloads: [], assocs: [], distinct: nil, lock: nil, windows: []] defmodule FromExpr do @moduledoc false @@ -398,7 +398,7 @@ defmodule Ecto.Query do @opaque dynamic :: %DynamicExpr{} alias Ecto.Query.Builder - alias Ecto.Query.Builder.{Distinct, Dynamic, Filter, From, GroupBy, Join, + alias Ecto.Query.Builder.{Distinct, Dynamic, Filter, From, GroupBy, Join, Windows, LimitOffset, Lock, OrderBy, Preload, Select, Update} @doc """ @@ -464,6 +464,26 @@ defmodule Ecto.Query do Dynamic.build(binding, expr, __CALLER__) end + @doc """ + Defines windows which can be used with `Ecto.Query.API.over/2`. + + Receives a keyword list where keys are names of the windows + and values are `Ecto.Query.API.partition_by/2` expression. + + ## Examples + + # Compare each employee's salary with the average salary in his or her department + from e in Employee, + select: {e.depname, e.empno, e.salary, avg(e.salary) |> over(:department)}, + windows: [department: partition_by(e.depname)] + + Note: MySQL older than 8.0 doesn't support window functions, + so you can use it only with MySQL newer than 8.0 or with any version of PostgreSQL. + """ + defmacro windows(query, binding \\ [], expr) do + Windows.build(query, binding, expr, __CALLER__) + end + @doc """ Converts a query into a subquery. @@ -648,7 +668,7 @@ defmodule Ecto.Query do @from_join_opts [:as, :prefix, :hints] @no_binds [:lock] - @binds [:where, :or_where, :select, :distinct, :order_by, :group_by] ++ + @binds [:where, :or_where, :select, :distinct, :order_by, :group_by, :windows] ++ [:having, :or_having, :limit, :offset, :preload, :update, :select_merge] defp from([{type, expr}|t], env, count_bind, quoted, binds) when type in @binds do diff --git a/lib/ecto/query/api.ex b/lib/ecto/query/api.ex index 9279009c5a..774a335f8f 100644 --- a/lib/ecto/query/api.ex +++ b/lib/ecto/query/api.ex @@ -176,6 +176,168 @@ defmodule Ecto.Query.API do """ def coalesce(value, expr), do: doc! [value, expr] + @doc """ + Invokes specified aggregation function in context of window. + + Accepts any built-in or user-defined aggregate function as well as window functions. + + `window` can be either existing window name or `partition_by/2` expression. + """ + def over(agg_fun, window), do: doc! [agg_fun, window] + + @doc """ + Defines a partition for `over` or `window` clause. + + `fields` can be either atom or a list of atoms and refers + fields in the source which will be used for grouping rows. + + You can also specify an `:order_by` option which syntax and behaviour + are similar to `Ecto.Query.order_by/2` except the fact that it affects + rows only inside the window. + + from e in Employee, + select: { + e.depname, + e.empno, + row_number() |> over(partition_by(e.depname, order_by: e.name)) + } + """ + def partition_by(fields, opts), do: doc! [fields, opts] + + @doc """ + Returns number of the current row within its partition, counting from 1. + + from p in Post, + select: row_number() |> over(partition_by(p.category_id, order_by: p.date)) + + Note that this function must be invoked using window function syntax. + """ + def row_number(), do: doc! [] + + @doc """ + Returns rank of the current row with gaps; same as `row_number/0` of its first peer. + + from p in Post, + select: rank() |> over(partition_by(p.category_id, order_by: p.date)) + + Note that this function must be invoked using window function syntax. + """ + def rank(), do: doc! [] + + @doc """ + Returns rank of the current row without gaps; this function counts peer groups. + + from p in Post, + select: dense_rank() |> over(partition_by(p.category_id, order_by: p.date)) + + Note that this function must be invoked using window function syntax. + """ + def dense_rank(), do: doc! [] + + @doc """ + Returns relative rank of the current row: (rank - 1) / (total rows - 1). + + from p in Post, + select: percent_rank() |> over(partition_by(p.category_id, order_by: p.date)) + + Note that this function must be invoked using window function syntax. + """ + def percent_rank(), do: doc! [] + + @doc """ + Returns relative rank of the current row: + (number of rows preceding or peer with current row) / (total rows). + + from p in Post, + select: cume_dist() |> over(partition_by(p.category_id, order_by: p.date)) + + Note that this function must be invoked using window function syntax. + """ + def cume_dist(), do: doc! [] + + @doc """ + Returns integer ranging from 1 to the argument value, dividing the partition as equally as possible. + + from p in Post, + select: ntile(10) |> over(partition_by(p.category_id, order_by: p.date)) + + Note that this function must be invoked using window function syntax. + """ + def ntile(num_buckets), do: doc! [num_buckets] + + @doc """ + Returns value evaluated at the row that is the first row of the window frame. + + from p in Post, + select: first_value(p.id) |> over(partition_by(p.category_id, order_by: p.date)) + + Note that this function must be invoked using window function syntax. + """ + def first_value(value), do: doc! [value] + + @doc """ + Returns value evaluated at the row that is the last row of the window frame. + + from p in Post, + select: last_value(p.id) |> over(partition_by(p.category_id, order_by: p.date)) + + Note that this function must be invoked using window function syntax. + """ + def last_value(value), do: doc! [value] + + @doc """ + Returns value evaluated at the row that is the nth row of the window + frame (counting from 1); null if no such row. + + from p in Post, + select: nth_value(p.id, 4) |> over(partition_by(p.category_id, order_by: p.date)) + + Note that this function must be invoked using window function syntax. + """ + def nth_value(value, nth), do: doc! [value, nth] + + @doc """ + Returns value evaluated at the row that is offset rows before + the current row within the partition; if there is no such row, + instead return default (which must be of the same type as value). + Both offset and default are evaluated with respect to the current row. + If omitted, offset defaults to 1 and default to null. + + from e in Events, + windows: [w: partition_by(e.name, order_by: e.tick)], + select: { + e.tick, + e.action, + e.name, + lag(e.action) |> over(:w), # previous_action + lead(e.action) |> over(:w) # next_action + } + + Note that this function must be invoked using window function syntax. + """ + def lag(value, offset, default), do: doc! [value, offset, default] + + @doc """ + Returns value evaluated at the row that is offset rows after + the current row within the partition; if there is no such row, + instead return default (which must be of the same type as value). + Both offset and default are evaluated with respect to the current row. + If omitted, offset defaults to 1 and default to null. + + from e in Events, + windows: [w: partition_by(e.name, order_by: e.tick)], + select: { + e.tick, + e.action, + e.name, + lag(e.action) |> over(:w), # previous_action + lead(e.action) |> over(:w) # next_action + } + + Note that this function must be invoked using window function syntax. + """ + def lead(value, offset, default), do: doc! [value, offset, default] + @doc """ Applies the given expression as a FILTER clause against an aggregate. This is currently only supported by Postgres. diff --git a/lib/ecto/query/builder.ex b/lib/ecto/query/builder.ex index c27487ec74..ffc32aebf6 100644 --- a/lib/ecto/query/builder.ex +++ b/lib/ecto/query/builder.ex @@ -266,6 +266,35 @@ defmodule Ecto.Query.Builder do {expr, params_acc} end + @over_aggs [ + row_number: {[], :integer}, + rank: {[], :integer}, + dense_rank: {[], :integer}, + percent_rank: {[], :float}, + cume_dist: {[], :float}, + ntile: {[:integer], :integer}, + first_value: {[:any], :any}, + last_value: {[:any], :any}, + nth_value: {[:any, :integer], :any}, + lag: {[:any, :integer, :any], :any}, + lead: {[:any, :integer, :any], :any}, + lag: {[:any, :integer], :any}, + lead: {[:any, :integer], :any}, + lag: {[:any], :any}, + lead: {[:any], :any}, + ] + @over_agg_names Enum.uniq(Keyword.keys(@over_aggs)) + @over_agg_names_with_arity Enum.map(@over_aggs, fn {name, {in_types, _}} -> {name, length(in_types)} end) + + def escape({agg, _, nil}, _type, _params_acc, _vars, _env) when {agg, 0} in @over_agg_names_with_arity do + error! "#{agg}/#{0} must be invoked using window function syntax" + end + + def escape({agg, _, args}, _type, _params_acc, _vars, _env) when {agg, length(args)} in @over_agg_names_with_arity do + arity = length(args || []) + error! "#{agg}/#{arity} must be invoked using window function syntax" + end + def escape({:filter, _, [aggregate]}, type, params_acc, vars, env) do escape(aggregate, type, params_acc, vars, env) end @@ -282,6 +311,13 @@ defmodule Ecto.Query.Builder do {{:{}, [], [:coalesce, [], [left, right]]}, params_acc} end + def escape({:over, _, [{agg_name, _, agg_args} | over_args]}, type, params_acc, vars, env) do + aggregate = {agg_name, [], agg_args || []} + {aggregate, params_acc} = escape_window_function(aggregate, type, params_acc, vars, env) + {window, params_acc} = escape_window(over_args, type, params_acc, vars, env) + {{:{}, [], [:over, [], [aggregate, window]]}, params_acc} + end + def escape({:=, _, _} = expr, _type, _params_acc, _vars, _env) do error! "`#{Macro.to_string(expr)}` is not a valid query expression. " <> "The match operator is not supported: `=`. " <> @@ -431,6 +467,42 @@ defmodule Ecto.Query.Builder do defp split_fragment(<>, consumed), do: split_fragment(rest, consumed <> <>) + defp escape_window([], _type, params_acc, _vars, _env), + do: {nil, params_acc} + defp escape_window([window_name], _type, params_acc, _vars, _env) when is_atom(window_name), + do: {window_name, params_acc} + defp escape_window([{:partition_by, _, expr}], _type, params_acc, vars, env) do + Ecto.Query.Builder.Windows.escape_window(expr, params_acc, vars, env) + end + + defp escape_window_function({agg, _, nil} = expr, type, params_acc, vars, env) + when {agg, 0} in @over_agg_names_with_arity do + escape_window_function(expr, type, params_acc, vars, env) + end + + defp escape_window_function({agg, _, args} = expr, type, params_acc, vars, env) + when {agg, length(args)} in @over_agg_names_with_arity do + {in_types, out_type} = window_function_call_type(agg, length(args)) + assert_type!(expr, type, out_type) + + {args, params} = args + |> Enum.zip(in_types) + |> Enum.map_reduce(params_acc, fn {arg, type}, acc -> escape(arg, type, acc, vars, env) end) + {{:{}, [], [agg, [], args]}, params} + end + + defp escape_window_function({agg, _, args}, _type, _params, _vars, _env) when agg in @over_agg_names do + variants = @over_aggs + |> Keyword.get_values(agg) + |> Enum.map(fn {in_types, _} -> "\t* #{agg}/#{length(in_types)}" end) + |> Enum.join("\n") + error! "window function #{agg}/#{length(args)} is undefined. Did you mean one of: \n" <> variants + end + + defp escape_window_function(expr, type, params, vars, env) do + escape(expr, type, params, vars, env) + end + defp escape_call({name, _, args}, type, params, vars, env) do {args, params} = Enum.map_reduce(args, params, &escape(&1, type, &2, vars, env)) expr = {:{}, [], [name, [], args]} @@ -475,6 +547,10 @@ defmodule Ecto.Query.Builder do defp merge_fragments([h1], []), do: [{:raw, h1}] + for {agg, {in_types, out_type}} <- @over_aggs do + defp window_function_call_type(unquote(agg), unquote(length(in_types))), do: unquote({in_types, out_type}) + end + defp call_type(agg, 1) when agg in ~w(avg count max min sum)a, do: {:any, :any} defp call_type(comp, 2) when comp in ~w(== != < > <= >=)a, do: {:any, :boolean} defp call_type(like, 2) when like in ~w(like ilike)a, do: {:string, :boolean} diff --git a/lib/ecto/query/builder/windows.ex b/lib/ecto/query/builder/windows.ex new file mode 100644 index 0000000000..63f6dcdeea --- /dev/null +++ b/lib/ecto/query/builder/windows.ex @@ -0,0 +1,98 @@ +import Kernel, except: [apply: 2] + +defmodule Ecto.Query.Builder.Windows do + @moduledoc false + + alias Ecto.Query.Builder + alias Ecto.Query.Builder.OrderBy + + @doc """ + Escapes a window params. + + ## Examples + + iex> escape(quote do [x.x, [order_by: [desc: 13]]] end, {%{}, :acc}, [x: 0], __ENV__) + {[ + fields: [ + {:{}, [], + [{:{}, [], [:., [], [{:{}, [], [:&, [], [0]]}, :x]]}, [], []]} + ], + order_by: [desc: 13] + ], {%{}, :acc}} + + """ + @spec escape([Macro.t], {map, term}, Keyword.t, Macro.Env.t | {Macro.Env.t, fun}) :: {Macro.t, {map, term}} + def escape(args, params_acc, vars, env) do + {fields_exp, opts} = escape_args(args) + {fields, params_acc} = Builder.escape(fields_exp, :any, params_acc, vars, env) + {opts, params_acc} = Enum.map_reduce(opts, params_acc, &escape_option(&1, &2, vars, env)) + {[{:fields, fields} | opts], params_acc} + end + + defp escape_args([fields, opts]) when is_list(opts), do: {List.wrap(fields), opts} + defp escape_args([fields]), do: {List.wrap(fields), []} + + defp escape_option({:order_by, expr}, params_acc, vars, env) do + {expr, _} = OrderBy.escape(:order_by, expr, vars, env) + {{:order_by, expr}, params_acc} + end + + @spec escape_window(Macro.t, {map, term}, Keyword.t, Macro.Env.t | {Macro.Env.t, fun}) :: {Macro.t, {map, term}} + def escape_window(expr, params_acc, vars, {env, _}) do + escape_window(expr, params_acc, vars, env) + end + + def escape_window(expr, params_acc, vars, env) do + {expr, params_acc} = escape(expr, params_acc, vars, env) + params = Builder.escape_params(elem(params_acc, 0)) + + window = quote do: %Ecto.Query.QueryExpr{ + expr: unquote(expr), + params: unquote(params), + file: unquote(env.file), + line: unquote(env.line)} + {window, params_acc} + end + + @doc """ + Builds a quoted expression. + + The quoted expression should evaluate to a query at runtime. + If possible, it does all calculations at compile time to avoid + runtime work. + """ + @spec build(Macro.t, [Macro.t], Keyword.t, Macro.Env.t) :: Macro.t + def build(query, binding, windows, env) when is_list(windows) do + {query, binding} = Builder.escape_binding(query, binding, env) + windows = Enum.map(windows, &build_window(binding, &1, env)) + Builder.apply_query(query, __MODULE__, [windows], env) + end + + defp build_window(vars, {name, {:partition_by, _, expr}}, env) do + {window, _} = escape_window(expr, {%{}, :acc}, vars, env) + {name, window} + end + + @spec validate_windows!(Keyword.t, Keyword.t) :: Tuple.t + def validate_windows!([], _), do: :ok + def validate_windows!([{name, _} | rest], windows) do + if Keyword.has_key?(windows, name) do + Builder.error! "window with name #{name} is already defined" + end + + validate_windows!(rest, windows) + end + + @doc """ + The callback applied by `build/4` to build the query. + """ + @spec apply(Ecto.Queryable.t, Keyword.t) :: Ecto.Query.t + def apply(%Ecto.Query{windows: windows} = query, definitions) do + validate_windows!(definitions, windows) + %{query | windows: windows ++ definitions} + end + + def apply(query, definitions) do + apply(Ecto.Queryable.to_query(query), definitions) + end +end diff --git a/lib/ecto/query/inspect.ex b/lib/ecto/query/inspect.ex index 38784eace5..1ef8bd4a80 100644 --- a/lib/ecto/query/inspect.ex +++ b/lib/ecto/query/inspect.ex @@ -52,6 +52,7 @@ defimpl Inspect, for: Ecto.Query do joins = joins(query.joins, names) preloads = preloads(query.preloads) assocs = assocs(query.assocs, names) + windows = windows(query.windows, names) wheres = bool_exprs(%{and: :where, or: :or_where}, query.wheres, names) group_bys = kw_exprs(:group_by, query.group_bys, names) @@ -65,7 +66,7 @@ defimpl Inspect, for: Ecto.Query do select = kw_expr(:select, query.select, names) distinct = kw_expr(:distinct, query.distinct, names) - Enum.concat [from, joins, wheres, group_bys, havings, order_bys, + Enum.concat [from, joins, wheres, group_bys, havings, windows, order_bys, limit, offset, lock, distinct, updates, select, preloads, assocs] end @@ -121,6 +122,31 @@ defimpl Inspect, for: Ecto.Query do end end + defp windows(windows, names) do + Enum.map(windows, &window(&1, names)) + end + + defp window({name, definition}, names) do + {:windows, "[#{name}: " <> partition_by(definition, names) <> "]"} + end + + defp partition_by(%{ expr: opts } = part, names) do + fields = Keyword.get(opts, :fields) |> expr(names, part) + order_bys = Keyword.get_values(opts, :order_by) |> Enum.concat + partition_by(fields, order_bys, names, part) + end + + defp partition_by(fields, [], _names, _part) do + "partition_by(#{fields})" + end + + defp partition_by(fields, order_bys, names, part) do + order_string = order_bys + |> Enum.map(fn {key, value} -> "#{key}: #{expr(value, names, part)}" end) + |> Enum.join(", ") + "partition_by(#{fields}, order_by: [#{order_string}])" + end + defp bool_exprs(keys, exprs, names) do Enum.map exprs, fn %{expr: expr, op: op} = part -> {Map.fetch!(keys, op), expr(expr, names, part)} @@ -206,6 +232,18 @@ defimpl Inspect, for: Ecto.Query do {:type, [], [value, tag]} |> expr(names, part) end + defp expr_to_string({:over, _, [agg, nil]}, _string, names, part) do + "over(#{expr(agg, names, part)})" + end + + defp expr_to_string({:over, _, [agg, window]}, _string, names, part) when is_atom(window) do + "over(#{expr(agg, names, part)}, :#{window})" + end + + defp expr_to_string({:over, _, [agg, window]}, _string, names, part) do + "over(#{expr(agg, names, part)}, #{partition_by(window, names)})" + end + defp expr_to_string(_expr, string, _, _) do string end diff --git a/lib/ecto/query/planner.ex b/lib/ecto/query/planner.ex index cae1c885bc..96ca97d2de 100644 --- a/lib/ecto/query/planner.ex +++ b/lib/ecto/query/planner.ex @@ -4,7 +4,7 @@ defmodule Ecto.Query.Planner do alias Ecto.Query.{BooleanExpr, DynamicExpr, JoinExpr, QueryExpr, SelectExpr} - if map_size(%Ecto.Query{}) != 18 do + if map_size(%Ecto.Query{}) != 19 do raise "Ecto.Query match out of date in builder" end @@ -570,6 +570,20 @@ defmodule Ecto.Query.Planner do end end + defp merge_cache(:windows, query, exprs, {cache, params}, adapter) do + {expr_cache, {params, cacheable?}} = + Enum.map_reduce exprs, {params, true}, fn {_, expr}, {params, cacheable?} -> + {params, current_cacheable?} = cast_and_merge_params(:windows, query, expr, params, adapter) + {expr_to_cache(expr), {params, cacheable? and current_cacheable?}} + end + + case expr_cache do + [] -> {cache, params} + _ -> {merge_cache({:windows, expr_cache}, cache, cacheable?), params} + end + end + + defp expr_to_cache(%BooleanExpr{op: op, expr: expr}), do: {op, expr} defp expr_to_cache(%QueryExpr{expr: expr}), do: expr defp expr_to_cache(%SelectExpr{expr: expr}), do: expr @@ -787,6 +801,18 @@ defmodule Ecto.Query.Planner do end end + defp validate_and_increment(:windows, query, exprs, counter, _operation, adapter) do + {exprs, counter} = + Enum.reduce(exprs, {[], counter}, fn + {_, %{expr: []}}, {list, acc} -> + {list, acc} + {name, expr}, {list, acc} -> + {expr, acc} = prewalk(:windows, query, expr, acc, adapter) + {[{name, expr}|list], acc} + end) + {Enum.reverse(exprs), counter} + end + defp prewalk_source({:fragment, meta, fragments}, kind, query, expr, acc, adapter) do {fragments, acc} = prewalk(fragments, kind, query, expr, acc, adapter) {{:fragment, meta, fragments}, acc} @@ -1016,6 +1042,29 @@ defmodule Ecto.Query.Planner do {type, [expr | fields], from} end + # OVER () + defp collect_fields({:over, _, [call, nil]} = expr, fields, from, query, take) do + {type, _, _} = collect_fields(call, fields, from, query, take) + {type, [expr | fields], from} + end + + # OVER named_window + defp collect_fields({:over, _, [call, window_name]} = expr, + fields, from, %Ecto.Query{ windows: windows } = query, take) when is_atom(window_name) do + if Keyword.has_key?(windows, window_name) do + {type, _, _} = collect_fields(call, fields, from, query, take) + {type, [expr | fields], from} + else + error!(query, "the window :#{window_name} must be defined in `windows`") + end + end + + # OVER (PARTITION BY ...) + defp collect_fields({:over, _, [call, _]} = expr, fields, from, query, take) do + {type, _, _} = collect_fields(call, fields, from, query, take) + {type, [expr | fields], from} + end + defp collect_fields({{:., dot_meta, [{:&, _, [_]}, _]}, _, []} = expr, fields, from, _query, _take) do {{:value, Keyword.fetch!(dot_meta, :type)}, [expr | fields], from} @@ -1205,7 +1254,7 @@ defmodule Ecto.Query.Planner do ## Helpers @exprs [distinct: :distinct, select: :select, from: :from, join: :joins, - where: :wheres, group_by: :group_bys, having: :havings, + where: :wheres, group_by: :group_bys, having: :havings, windows: :windows, order_by: :order_bys, limit: :limit, offset: :offset] # Traverse all query components with expressions. diff --git a/test/ecto/adapters/mysql_test.exs b/test/ecto/adapters/mysql_test.exs index 5543cca735..b1561094b7 100644 --- a/test/ecto/adapters/mysql_test.exs +++ b/test/ecto/adapters/mysql_test.exs @@ -438,6 +438,112 @@ defmodule Ecto.Adapters.MySQLTest do assert delete_all(query) == ~s{DELETE s0.* FROM `first`.`schema` AS s0} end + ## Partitions and windows + + test "one window" do + query = Schema + |> select([r], r.x) + |> windows([r], w: partition_by r.x) + |> plan + + assert all(query) == ~s{SELECT s0.`x` FROM `schema` AS s0 WINDOW `w` AS (PARTITION BY s0.`x`)} + end + + test "two windows" do + query = Schema + |> select([r], r.x) + |> windows([r], w1: partition_by(r.x), w2: partition_by(r.y)) + |> plan() + assert all(query) == ~s{SELECT s0.`x` FROM `schema` AS s0 WINDOW `w1` AS (PARTITION BY s0.`x`), `w2` AS (PARTITION BY s0.`y`)} + end + + test "multifiled partition by" do + query = Schema + |> windows([r], w: partition_by [r.x, r.z]) + |> select([r], r.x) + |> plan() + assert all(query) == ~s{SELECT s0.`x` FROM `schema` AS s0 WINDOW `w` AS (PARTITION BY s0.`x`, s0.`z`)} + end + + test "count over all" do + query = Schema + |> select([r], count(r.x) |> over) + |> plan() + assert all(query) == ~s{SELECT count(s0.`x`) OVER () FROM `schema` AS s0} + end + + test "row_number over all" do + query = Schema + |> select(row_number |> over) + |> plan() + assert all(query) == ~s{SELECT row_number() OVER () FROM `schema` AS s0} + end + + test "nth_value over all" do + query = Schema + |> select([r], nth_value(r.x, 42) |> over) + |> plan() + assert all(query) == ~s{SELECT nth_value(s0.`x`, 42) OVER () FROM `schema` AS s0} + end + + test "lag/2 over all" do + query = Schema + |> select([r], lag(r.x, 42) |> over) + |> plan() + assert all(query) == ~s{SELECT lag(s0.`x`, 42) OVER () FROM `schema` AS s0} + end + + test "custom aggregation over all" do + query = Schema + |> select([r], fragment("custom_function(?)", r.x) |> over) + |> plan() + assert all(query) == ~s{SELECT custom_function(s0.`x`) OVER () FROM `schema` AS s0} + end + + test "count over partition by (inline)" do + query = Schema + |> select([r], count(r.x) |> over(partition_by [r.x, r.z])) + + query = query |> plan() + assert all(query) == ~s{SELECT count(s0.`x`) OVER (PARTITION BY s0.`x`, s0.`z`) FROM `schema` AS s0} + end + + test "count over window" do + query = Schema + |> windows([r], w: partition_by r.x) + |> select([r], count(r.x) |> over(:w)) + |> plan() + assert all(query) == ~s{SELECT count(s0.`x`) OVER `w` FROM `schema` AS s0 WINDOW `w` AS (PARTITION BY s0.`x`)} + end + + test "count over ordered window (keywords)" do + param = :b + query = from s in Schema, + windows: [w1: partition_by(s.a)], + windows: [w2: partition_by(s.x, order_by: s.a, order_by: [desc: ^param])], + select: (count(s.x) |> over(:w2)) + + query = query |> plan() + assert all(query) == ~s{SELECT count(s0.`x`) OVER `w2` FROM `schema` AS s0 WINDOW `w1` AS (PARTITION BY s0.`a`), `w2` AS (PARTITION BY s0.`x` ORDER BY s0.`a`, s0.`b` DESC)} + end + + test "count over unknown window" do + query = Schema + |> windows([r], w: partition_by r.x) + |> select([r], count(r.x) |> over(:v)) + assert_raise Ecto.QueryError, fn -> + plan(query) + end + end + + test "count over ordered partition" do + fields = [:y] + query = Schema + |> select([r], count(r.x) |> over(partition_by(r.x, order_by: ^fields, order_by: r.z))) + |> plan() + assert all(query) == ~s{SELECT count(s0.`x`) OVER (PARTITION BY s0.`x` ORDER BY s0.`y`, s0.`z`) FROM `schema` AS s0} + end + ## Joins test "join" do diff --git a/test/ecto/adapters/postgres_test.exs b/test/ecto/adapters/postgres_test.exs index 7ef873ce3b..08af4068a1 100644 --- a/test/ecto/adapters/postgres_test.exs +++ b/test/ecto/adapters/postgres_test.exs @@ -544,6 +544,112 @@ defmodule Ecto.Adapters.PostgresTest do assert delete_all(query) == ~s{DELETE FROM "first"."schema" AS s0} end + ## Partitions and windows + + test "one window" do + query = Schema + |> select([r], r.x) + |> windows([r], w: partition_by r.x) + |> plan + + assert all(query) == ~s{SELECT s0."x" FROM "schema" AS s0 WINDOW "w" AS (PARTITION BY s0."x")} + end + + test "two windows" do + query = Schema + |> select([r], r.x) + |> windows([r], w1: partition_by(r.x), w2: partition_by(r.y)) + |> plan() + assert all(query) == ~s{SELECT s0."x" FROM "schema" AS s0 WINDOW "w1" AS (PARTITION BY s0."x"), "w2" AS (PARTITION BY s0."y")} + end + + test "multifiled partition by" do + query = Schema + |> windows([r], w: partition_by [r.x, r.z]) + |> select([r], r.x) + |> plan() + assert all(query) == ~s{SELECT s0."x" FROM "schema" AS s0 WINDOW "w" AS (PARTITION BY s0."x", s0."z")} + end + + test "count over all" do + query = Schema + |> select([r], count(r.x) |> over) + |> plan() + assert all(query) == ~s{SELECT count(s0."x") OVER () FROM "schema" AS s0} + end + + test "row_number over all" do + query = Schema + |> select(row_number |> over) + |> plan() + assert all(query) == ~s{SELECT row_number() OVER () FROM "schema" AS s0} + end + + test "nth_value over all" do + query = Schema + |> select([r], nth_value(r.x, 42) |> over) + |> plan() + assert all(query) == ~s{SELECT nth_value(s0."x", 42) OVER () FROM "schema" AS s0} + end + + test "lag/2 over all" do + query = Schema + |> select([r], lag(r.x, 42) |> over) + |> plan() + assert all(query) == ~s{SELECT lag(s0."x", 42) OVER () FROM "schema" AS s0} + end + + test "custom aggregation over all" do + query = Schema + |> select([r], fragment("custom_function(?)", r.x) |> over) + |> plan() + assert all(query) == ~s{SELECT custom_function(s0."x") OVER () FROM "schema" AS s0} + end + + test "count over partition by (inline)" do + query = Schema + |> select([r], count(r.x) |> over(partition_by [r.x, r.z])) + + query = query |> plan() + assert all(query) == ~s{SELECT count(s0."x") OVER (PARTITION BY s0."x", s0."z") FROM "schema" AS s0} + end + + test "count over window" do + query = Schema + |> windows([r], w: partition_by r.x) + |> select([r], count(r.x) |> over(:w)) + |> plan() + assert all(query) == ~s{SELECT count(s0."x") OVER "w" FROM "schema" AS s0 WINDOW "w" AS (PARTITION BY s0."x")} + end + + test "count over ordered window (keywords)" do + param = :b + query = from s in Schema, + windows: [w1: partition_by(s.a)], + windows: [w2: partition_by(s.x, order_by: s.a, order_by: [desc: ^param])], + select: (count(s.x) |> over(:w2)) + + query = query |> plan() + assert all(query) == ~s{SELECT count(s0."x") OVER "w2" FROM "schema" AS s0 WINDOW "w1" AS (PARTITION BY s0."a"), "w2" AS (PARTITION BY s0."x" ORDER BY s0."a", s0."b" DESC)} + end + + test "count over unknown window" do + query = Schema + |> windows([r], w: partition_by r.x) + |> select([r], count(r.x) |> over(:v)) + assert_raise Ecto.QueryError, fn -> + plan(query) + end + end + + test "count over ordered partition" do + fields = [:y] + query = Schema + |> select([r], count(r.x) |> over(partition_by(r.x, order_by: ^fields, order_by: r.z))) + |> plan() + assert all(query) == ~s{SELECT count(s0."x") OVER (PARTITION BY s0."x" ORDER BY s0."y", s0."z") FROM "schema" AS s0} + end + ## Joins test "join" do diff --git a/test/ecto/query/builder/windows_test.exs b/test/ecto/query/builder/windows_test.exs new file mode 100644 index 0000000000..9f8f176b75 --- /dev/null +++ b/test/ecto/query/builder/windows_test.exs @@ -0,0 +1,39 @@ +defmodule Ecto.Query.Builder.WindowsTest do + use ExUnit.Case, async: true + + import Ecto.Query.Builder.Windows + doctest Ecto.Query.Builder.Windows + + import Ecto.Query + + defp escape(quoted, vars, env) do + {escaped, {params, :acc}} = escape(quoted, {%{}, :acc}, vars, env) + {escaped, params} + end + + describe "escape" do + test "handles expressions and params" do + assert {Macro.escape(quote do [fields: [&0.x]] end), %{}} == + escape(quote do [x.x] end, [x: 0], __ENV__) + + assert {Macro.escape(quote do [fields: [&0.x, &0.y]] end), %{}} == + escape(quote do [[x.x, x.y]] end, [x: 0], __ENV__) + + assert {Macro.escape(quote do [fields: [&0.x], order_by: [asc: &0.y]] end), %{}} == + escape(quote do [x.x, [order_by: x.y]] end, [x: 0], __ENV__) + + assert {Macro.escape(quote do [fields: [&0.x, &0.z], order_by: [asc: &0.y]] end), %{}} == + escape(quote do [[x.x, x.z], [order_by: x.y]] end, [x: 0], __ENV__) + + assert {Macro.escape(quote do [fields: [&0.x], order_by: [asc: &0.y], order_by: [asc: &0.z]] end), %{}} == + escape(quote do [x.x, [order_by: x.y, order_by: x.z]] end, [x: 0], __ENV__) + end + + test "raises on duplicate window" do + query = "q" |> windows([p], w: partition_by p.x) + assert_raise Ecto.Query.CompileError, ~r"window with name w is already defined", fn -> + query |> windows([p], w: partition_by p.y) + end + end + end +end diff --git a/test/ecto/query/builder_test.exs b/test/ecto/query/builder_test.exs index 53a0766dbf..98c8e586ad 100644 --- a/test/ecto/query/builder_test.exs +++ b/test/ecto/query/builder_test.exs @@ -67,6 +67,17 @@ defmodule Ecto.Query.BuilderTest do end end + test "escape over" do + assert {Macro.escape(quote(do: over(row_number(), nil))), %{}} == + escape(quote(do: over(row_number())), [], __ENV__) + + assert {Macro.escape(quote(do: over(nth_value(&0.id, 1), :w))), %{}} == + escape(quote(do: nth_value(x.id, 1) |> over(:w)), [x: 0], __ENV__) + + assert {Macro.escape(quote(do: over(count(&0.id), :w))), %{}} == + escape(quote(do: count(x.id) |> over(:w)), [x: 0], __ENV__) + end + test "escape type checks" do assert_raise Ecto.Query.CompileError, ~r"It returns a value of type :boolean but a value of type :integer is expected", fn -> escape(quote(do: ^1 == ^2), :integer, %{}, [], __ENV__) @@ -115,6 +126,14 @@ defmodule Ecto.Query.BuilderTest do fn -> escape(quote(do: Foo.bar(x)), [x: 0], __ENV__) |> elem(0) |> Code.eval_quoted([], __ENV__) end + + assert_raise Ecto.Query.CompileError, ~r"lag/2 must be invoked using window function syntax", fn -> + escape(quote(do: lag(:a, 1)), [], __ENV__) + end + + assert_raise Ecto.Query.CompileError, ~r"window function lag/0 is undefined.", fn -> + escape(quote(do: over(lag())), [], __ENV__) + end end test "doesn't escape interpolation" do diff --git a/test/ecto/query/inspect_test.exs b/test/ecto/query/inspect_test.exs index 6a7499ca6b..5c2244c685 100644 --- a/test/ecto/query/inspect_test.exs +++ b/test/ecto/query/inspect_test.exs @@ -144,6 +144,34 @@ defmodule Ecto.Query.InspectTest do ~s{from p in Inspect.Post, having: p.foo == p.bar, having: true} end + test "window" do + assert i(from(x in Post, windows: [a: partition_by x.foo])) == + ~s{from p in Inspect.Post, windows: [a: partition_by([p.foo])]} + + assert i(from(x in Post, windows: [a: partition_by(x.foo), b: partition_by(x.bar)])) == + ~s{from p in Inspect.Post, windows: [a: partition_by([p.foo])], windows: [b: partition_by([p.bar])]} + + assert i(from(x in Post, windows: [a: partition_by(x.foo)], windows: [b: partition_by(x.bar)])) == + ~s{from p in Inspect.Post, windows: [a: partition_by([p.foo])], windows: [b: partition_by([p.bar])]} + + assert i(from(x in Post, windows: [a: partition_by(x.foo, order_by: x.bar)])) == + ~s{from p in Inspect.Post, windows: [a: partition_by([p.foo], order_by: [asc: p.bar])]} + + assert i(from(x in Post, windows: [a: partition_by([x.foo, x.bar], order_by: x.bar)])) == + ~s{from p in Inspect.Post, windows: [a: partition_by([p.foo, p.bar], order_by: [asc: p.bar])]} + end + + test "over" do + assert i(from(x in Post, select: count(x.x) |> over)) == + ~s{from p in Inspect.Post, select: over(count(p.x))} + + assert i(from(x in Post, select: count(x.x) |> over(partition_by(x.bar)))) == + ~s{from p in Inspect.Post, select: over(count(p.x), partition_by([p.bar]))} + + assert i(from(x in Post, select: count(x.x) |> over(partition_by([x.foo, x.bar], order_by: x.bar)))) == + ~s{from p in Inspect.Post, select: over(count(p.x), partition_by([p.foo, p.bar], order_by: [asc: p.bar]))} + end + test "order by" do assert i(from(x in Post, order_by: [asc: x.foo, desc: x.bar], order_by: x.foobar)) == ~s{from p in Inspect.Post, order_by: [asc: p.foo, desc: p.bar], order_by: [asc: p.foobar]}