Skip to content

Commit

Permalink
Implementation of over, partition by and window (#2618)
Browse files Browse the repository at this point in the history
  • Loading branch information
Anber authored and josevalim committed Sep 25, 2018
1 parent 2defabe commit 00dbf3e
Show file tree
Hide file tree
Showing 14 changed files with 877 additions and 12 deletions.
48 changes: 48 additions & 0 deletions integration_test/cases/windows.exs
@@ -0,0 +1,48 @@
defmodule Ecto.Integration.WindowsTest do
use Ecto.Integration.Case, async: Application.get_env(:ecto, :async_integration_tests, true)

alias Ecto.Integration.TestRepo
import Ecto.Query

alias Ecto.Integration.Comment
alias Ecto.Integration.User

test "count over partition" do
u1 = TestRepo.insert!(%User{name: "Tester"})
u2 = TestRepo.insert!(%User{name: "Developer"})
c1 = TestRepo.insert!(%Comment{text: "1", author_id: u1.id})
c2 = TestRepo.insert!(%Comment{text: "2", author_id: u1.id})
c3 = TestRepo.insert!(%Comment{text: "3", author_id: u1.id})
c4 = TestRepo.insert!(%Comment{text: "4", author_id: u2.id})

query = from(c in Comment, select: [c, count(c.id) |> over(partition_by(c.author_id))])

assert [[^c1, 3], [^c2, 3], [^c3, 3], [^c4, 1]] = TestRepo.all(query)
end

test "last 2 of each author" do
u1 = TestRepo.insert!(%User{name: "Tester"})
u2 = TestRepo.insert!(%User{name: "Developer"})
TestRepo.insert!(%Comment{text: "1", author_id: u1.id})
TestRepo.insert!(%Comment{text: "2", author_id: u1.id})
TestRepo.insert!(%Comment{text: "3", author_id: u1.id})
TestRepo.insert!(%Comment{text: "4", author_id: u2.id})

subquery = from(c in Comment,
windows: [rw: partition_by(c.author_id, order_by: :id)],
select: %{
comment: c.text,
row: row_number() |> over(:rw),
total: count(c.id) |> over(partition_by(c.author_id))
},
where: c.author_id in [^u1.id, ^u2.id]
)

query = from(r in subquery(subquery),
select: r.comment,
where: (r.total - r.row) < 2
)

assert ["2", "3", "4"] = TestRepo.all(query)
end
end
47 changes: 42 additions & 5 deletions lib/ecto/adapters/mysql/connection.ex
Expand Up @@ -86,12 +86,13 @@ if Code.ensure_loaded?(Mariaex) do
where = where(query, sources)
group_by = group_by(query, sources)
having = having(query, sources)
window = window(query, sources)
order_by = order_by(query, sources)
limit = limit(query, sources)
offset = offset(query, sources)
lock = lock(query.lock)

[select, from, join, where, group_by, having, order_by, limit, offset | lock]
[select, from, join, where, group_by, having, window, order_by, limit, offset | lock]
end

def update_all(query, prefix \\ nil) do
Expand Down Expand Up @@ -324,13 +325,34 @@ if Code.ensure_loaded?(Mariaex) do
end)]
end

defp window(%Query{windows: []}, _sources), do: []
defp window(%Query{windows: windows} = query, sources) do
[" WINDOW " |
intersperse_map(windows, ", ", fn
{name, definition} ->
[quote_name(name), " AS ", partition_by(definition, sources, query)] end)]
end

defp partition_by(%QueryExpr{expr: opts}, sources, %Query{} = query) do
fields = Keyword.get(opts, :fields)
order_bys = Keyword.get_values(opts, :order_by) |> Enum.concat
fields = fields |> intersperse_map(", ", &expr(&1, sources, query))
["(PARTITION BY ",
fields,
order_by(order_bys, query, sources),
?)]
end

defp order_by(%Query{order_bys: []}, _sources), do: []
defp order_by(%Query{order_bys: order_bys} = query, sources) do
order_bys = Enum.flat_map(order_bys, & &1.expr)
order_by(order_bys, query, sources)
end

defp order_by([], _query, _sources), do: []
defp order_by(order_bys, query, sources) do
[" ORDER BY " |
intersperse_map(order_bys, ", ", fn
%QueryExpr{expr: expr} ->
intersperse_map(expr, ", ", &order_by_expr(&1, sources, query))
end)]
intersperse_map(order_bys, ", ", &order_by_expr(&1, sources, query))]
end

defp order_by_expr({dir, expr}, sources, query) do
Expand Down Expand Up @@ -461,6 +483,21 @@ if Code.ensure_loaded?(Mariaex) do
error!(query, "ilike is not supported by MySQL")
end

defp expr({:over, _, [agg, %QueryExpr{} = window]}, sources, query) do
aggregate = expr(agg, sources, query)
[aggregate, " OVER ", partition_by(window, sources, query)]
end

defp expr({:over, _, [agg, nil]}, sources, query) do
aggregate = expr(agg, sources, query)
[aggregate, " OVER ()"]
end

defp expr({:over, _, [agg, name]}, sources, query) when is_atom(name) do
aggregate = expr(agg, sources, query)
[aggregate, " OVER ", quote_name(name)]
end

defp expr({:{}, _, elems}, sources, query) do
[?(, intersperse_map(elems, ?,, &expr(&1, sources, query)), ?)]
end
Expand Down
41 changes: 40 additions & 1 deletion lib/ecto/adapters/postgres/connection.ex
Expand Up @@ -103,12 +103,13 @@ if Code.ensure_loaded?(Postgrex) do
where = where(query, sources)
group_by = group_by(query, sources)
having = having(query, sources)
window = window(query, sources)
order_by = order_by(query, order_by_distinct, sources)
limit = limit(query, sources)
offset = offset(query, sources)
lock = lock(query.lock)

[select, from, join, where, group_by, having, order_by, limit, offset | lock]
[select, from, join, where, group_by, having, window, order_by, limit, offset | lock]
end

def update_all(%{from: %{source: source}} = query, prefix \\ nil) do
Expand Down Expand Up @@ -364,9 +365,32 @@ if Code.ensure_loaded?(Postgrex) do
end)]
end

defp window(%Query{windows: []}, _sources), do: []
defp window(%Query{windows: windows} = query, sources) do
[" WINDOW " |
intersperse_map(windows, ", ", fn
{name, definition} ->
[quote_name(name), " AS ", partition_by(definition, sources, query)] end)]
end

defp partition_by(%QueryExpr{expr: opts}, sources, %Query{} = query) do
fields = Keyword.get(opts, :fields)
order_bys = Keyword.get_values(opts, :order_by) |> Enum.concat
fields = fields |> intersperse_map(", ", &expr(&1, sources, query))
["(PARTITION BY ",
fields,
order_by(order_bys, query, [], sources),
?)]
end

defp order_by(%Query{order_bys: []}, _distinct, _sources), do: []
defp order_by(%Query{order_bys: order_bys} = query, distinct, sources) do
order_bys = Enum.flat_map(order_bys, & &1.expr)
order_by(order_bys, query, distinct, sources)
end

defp order_by([], _query, _distinct, _sources), do: []
defp order_by(order_bys, query, distinct, sources) do
[" ORDER BY " |
intersperse_map(distinct ++ order_bys, ", ", &order_by_expr(&1, sources, query))]
end
Expand Down Expand Up @@ -492,6 +516,21 @@ if Code.ensure_loaded?(Postgrex) do
[aggregate, " FILTER (WHERE ", expr(filter, sources, query), ?)]
end

defp expr({:over, _, [agg, %QueryExpr{} = window]}, sources, query) do
aggregate = expr(agg, sources, query)
[aggregate, " OVER ", partition_by(window, sources, query)]
end

defp expr({:over, _, [agg, nil]}, sources, query) do
aggregate = expr(agg, sources, query)
[aggregate, " OVER ()"]
end

defp expr({:over, _, [agg, name]}, sources, query) when is_atom(name) do
aggregate = expr(agg, sources, query)
[aggregate, " OVER ", quote_name(name)]
end

defp expr({:{}, _, elems}, sources, query) do
[?(, intersperse_map(elems, ?,, &expr(&1, sources, query)), ?)]
end
Expand Down
26 changes: 23 additions & 3 deletions lib/ecto/query.ex
Expand Up @@ -354,7 +354,7 @@ defmodule Ecto.Query do

defstruct [prefix: nil, sources: nil, from: nil, joins: [], aliases: %{}, wheres: [], select: nil,
order_bys: [], limit: nil, offset: nil, group_bys: [], updates: [],
havings: [], preloads: [], assocs: [], distinct: nil, lock: nil]
havings: [], preloads: [], assocs: [], distinct: nil, lock: nil, windows: []]

defmodule FromExpr do
@moduledoc false
Expand Down Expand Up @@ -398,7 +398,7 @@ defmodule Ecto.Query do
@opaque dynamic :: %DynamicExpr{}

alias Ecto.Query.Builder
alias Ecto.Query.Builder.{Distinct, Dynamic, Filter, From, GroupBy, Join,
alias Ecto.Query.Builder.{Distinct, Dynamic, Filter, From, GroupBy, Join, Windows,
LimitOffset, Lock, OrderBy, Preload, Select, Update}

@doc """
Expand Down Expand Up @@ -464,6 +464,26 @@ defmodule Ecto.Query do
Dynamic.build(binding, expr, __CALLER__)
end

@doc """
Defines windows which can be used with `Ecto.Query.API.over/2`.
Receives a keyword list where keys are names of the windows
and values are `Ecto.Query.API.partition_by/2` expression.
## Examples
# Compare each employee's salary with the average salary in his or her department
from e in Employee,
select: {e.depname, e.empno, e.salary, avg(e.salary) |> over(:department)},
windows: [department: partition_by(e.depname)]
Note: MySQL older than 8.0 doesn't support window functions,
so you can use it only with MySQL newer than 8.0 or with any version of PostgreSQL.
"""
defmacro windows(query, binding \\ [], expr) do
Windows.build(query, binding, expr, __CALLER__)
end

@doc """
Converts a query into a subquery.
Expand Down Expand Up @@ -648,7 +668,7 @@ defmodule Ecto.Query do

@from_join_opts [:as, :prefix, :hints]
@no_binds [:lock]
@binds [:where, :or_where, :select, :distinct, :order_by, :group_by] ++
@binds [:where, :or_where, :select, :distinct, :order_by, :group_by, :windows] ++
[:having, :or_having, :limit, :offset, :preload, :update, :select_merge]

defp from([{type, expr}|t], env, count_bind, quoted, binds) when type in @binds do
Expand Down

0 comments on commit 00dbf3e

Please sign in to comment.