From 573384497f11d592fa47ffe5f7d0017b4d22a0fe Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Thu, 30 May 2024 17:57:47 +0200 Subject: [PATCH 01/17] feat: bulk operations --- lib/arke_postgres.ex | 80 ++++++++++++++++++++-------------- lib/arke_postgres/arke_unit.ex | 63 ++++++++++++++++++-------- 2 files changed, 93 insertions(+), 50 deletions(-) diff --git a/lib/arke_postgres.ex b/lib/arke_postgres.ex index 0e8499d..0f37359 100644 --- a/lib/arke_postgres.ex +++ b/lib/arke_postgres.ex @@ -20,8 +20,8 @@ defmodule ArkePostgres do case check_env() do {:ok, nil} -> try do + projects = Query.get_project_record() - projects =Query.get_project_record() Enum.each(projects, fn %{id: project_id} = _project -> start_managers(project_id) end) @@ -31,10 +31,16 @@ defmodule ArkePostgres do _ in DBConnection.ConnectionError -> IO.inspect("ConnectionError") :error + err in Postgrex.Error -> - %{message: message,postgres: %{code: code, message: postgres_message}} = err - parsed_message = %{context: "postgrex_error", message: "#{message || postgres_message}"} - IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ]) + %{message: message, postgres: %{code: code, message: postgres_message}} = err + + parsed_message = %{ + context: "postgrex_error", + message: "#{message || postgres_message}" + } + + IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan]) :error end @@ -67,46 +73,60 @@ defmodule ArkePostgres do end end - defp start_managers(project_id) when is_binary(project_id), do: start_managers(String.to_atom(project_id)) + defp start_managers(project_id) when is_binary(project_id), + do: start_managers(String.to_atom(project_id)) + defp start_managers(project_id) do {parameters, arke_list, groups} = Query.get_manager_units(project_id) - Arke.handle_manager(parameters,project_id,:parameter) - Arke.handle_manager(arke_list,project_id,:arke) - Arke.handle_manager(groups,project_id,:group) - + Arke.handle_manager(parameters, project_id, :parameter) + Arke.handle_manager(arke_list, project_id, :arke) + Arke.handle_manager(groups, project_id, :group) end - def create(project, %{arke_id: arke_id} = unit) do + def create(project, unit, opts \\ []) + + def create(project, %{arke_id: arke_id} = unit, opts), + do: create(project, [unit], opts) + + def create(_, [], opts), do: get_operation_result([], [], opts[:bulk]) + + def create(project, [%{arke_id: arke_id} | _] = unit_list, opts) do arke = Arke.Boundary.ArkeManager.get(arke_id, project) - case handle_create(project, arke, unit) do - {:ok, unit} -> - {:ok, - Arke.Core.Unit.update(unit, metadata: Map.merge(unit.metadata, %{project: project}))} + + case handle_create(project, arke, unit_list) do + {:ok, valid, errors} -> + get_operation_result( + Enum.map(valid, fn unit -> + Arke.Core.Unit.update(unit, metadata: Map.merge(unit.metadata, %{project: project})) + end), + errors, + opts[:bulk] + ) {:error, errors} -> - {:error, handle_changeset_errros(errors)} + {:error, errors} end end defp handle_create( project, %{data: %{type: "table"}} = arke, - %{data: data, metadata: metadata} = unit + [%{data: data, metadata: metadata} = unit | _] = _ ) do + # todo: handle bulk? # todo: remove once the project is not needed anymore data = data |> Map.merge(%{metadata: Map.delete(metadata, :project)}) |> data_as_klist Table.insert(project, arke, data) - {:ok, unit} + {:ok, [unit], []} end - defp handle_create(project, %{data: %{type: "arke"}} = arke, unit) do - case ArkeUnit.insert(project, arke, unit) do - {:ok, %{id: id, inserted_at: inserted_at, updated_at: updated_at}} -> - {:ok, - Arke.Core.Unit.update(unit, id: id, inserted_at: inserted_at, updated_at: updated_at)} + defp handle_create(project, %{data: %{type: "arke"}} = arke, unit_list) do + case ArkeUnit.insert(project, arke, unit_list) do + {:ok, records, valid, errors} -> + {:ok, valid, errors} - {:error, errors} -> + {:error, errors, _, _} -> {:error, errors} end end @@ -115,7 +135,10 @@ defmodule ArkePostgres do {:error, "arke type not supported"} end - def update(project, %{arke_id: arke_id} = unit) do + defp get_operation_result(valid, errors, true), do: {:ok, valid, errors} + defp get_operation_result(valid, errors, _), do: {:ok, List.first(valid)} + + def update(project, %{arke_id: arke_id} = unit, opts \\ []) do arke = Arke.Boundary.ArkeManager.get(arke_id, project) {:ok, unit} = handle_update(project, arke, unit) end @@ -147,7 +170,7 @@ defmodule ArkePostgres do {:error, "arke type not supported"} end - def delete(project, %{arke_id: arke_id} = unit) do + def delete(project, %{arke_id: arke_id} = unit, opts \\ []) do arke = Arke.Boundary.ArkeManager.get(arke_id, project) handle_delete(project, arke, unit) end @@ -202,13 +225,6 @@ defmodule ArkePostgres do Enum.to_list(data) end - defp handle_changeset_errros(errors)when is_binary(errors), do: errors - defp handle_changeset_errros(errors) do - Enum.map(errors, fn {field, detail} -> - "#{field}: #{render_detail(detail)}" - end) - end - defp render_detail({message, values}) do Enum.reduce(values, message, fn {k, v}, acc -> String.replace(acc, "%{#{k}}", to_string(v)) diff --git a/lib/arke_postgres/arke_unit.ex b/lib/arke_postgres/arke_unit.ex index 1506055..b7fc786 100644 --- a/lib/arke_postgres/arke_unit.ex +++ b/lib/arke_postgres/arke_unit.ex @@ -19,24 +19,51 @@ defmodule ArkePostgres.ArkeUnit do @record_fields [:id, :data, :metadata, :inserted_at, :updated_at] - def insert(project, arke, %{data: data} = unit) do - row = [ - id: handle_id(unit.id), - arke_id: Atom.to_string(unit.arke_id), - data: encode_unit_data(arke, data), - metadata: unit.metadata, - inserted_at: unit.inserted_at, - updated_at: unit.updated_at - ] - - case ArkePostgres.Repo.insert(ArkePostgres.Tables.ArkeUnit.changeset(Enum.into(row, %{})), - prefix: project - ) do - {:ok, record} -> - {:ok, record} - - {:error, changeset} -> - {:error, changeset.errors} + def insert(project, arke, unit_list) do + now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second) + + %{unit_list: updated_unit_list, records: records} = + Enum.reduce(unit_list, %{unit_list: [], records: []}, fn unit, acc -> + id = handle_id(unit.id) + + updated_unit = + unit |> Map.put(:id, id) |> Map.put(:inserted_at, now) |> Map.put(:updated_at, now) + + acc + |> Map.put(:unit_list, [updated_unit | acc.unit_list]) + |> Map.put(:records, [ + %{ + id: id, + arke_id: Atom.to_string(unit.arke_id), + data: encode_unit_data(arke, unit.data), + metadata: unit.metadata, + inserted_at: now, + updated_at: now + } + | acc.records + ]) + end) + + case( + ArkePostgres.Repo.insert_all( + ArkePostgres.Tables.ArkeUnit, + records, + prefix: project, + returning: true + ) + ) do + {0, _} -> + {:error, Error.create(:insert, "no records inserted"), nil} + + {_, inserted} -> + inserted_ids = Enum.map(inserted, & &1.id) + + {valid, errors} = + Enum.split_with(updated_unit_list, fn unit -> + unit.id in inserted_ids + end) + + {:ok, inserted, valid, errors} end end From 247c6126fdd1e2ccfa78f3ac777b6d71eaa3144d Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Fri, 31 May 2024 17:48:41 +0200 Subject: [PATCH 02/17] feat: handle bulk update --- lib/arke_postgres.ex | 40 +++++++++++++++++++++++------ lib/arke_postgres/arke_unit.ex | 46 +++++++++++++++++++++++++--------- 2 files changed, 66 insertions(+), 20 deletions(-) diff --git a/lib/arke_postgres.ex b/lib/arke_postgres.ex index 0f37359..ae1d4ad 100644 --- a/lib/arke_postgres.ex +++ b/lib/arke_postgres.ex @@ -135,19 +135,35 @@ defmodule ArkePostgres do {:error, "arke type not supported"} end - defp get_operation_result(valid, errors, true), do: {:ok, valid, errors} - defp get_operation_result(valid, errors, _), do: {:ok, List.first(valid)} + def update(project, unit, opts \\ []) + + def update(project, %{arke_id: arke_id} = unit, opts), + do: update(project, [unit], opts) + + def update(_, [], opts), do: get_operation_result([], [], opts[:bulk]) - def update(project, %{arke_id: arke_id} = unit, opts \\ []) do + def update(project, [%{arke_id: arke_id} | _] = unit_list, opts) do arke = Arke.Boundary.ArkeManager.get(arke_id, project) - {:ok, unit} = handle_update(project, arke, unit) + + case handle_update(project, arke, unit_list) do + {:ok, valid, errors} -> + get_operation_result( + valid, + errors, + opts[:bulk] + ) + + {:error, errors} -> + {:error, errors} + end end def handle_update( project, %{data: %{type: "table"}} = arke, - %{data: data, metadata: metadata} = unit + [%{data: data, metadata: metadata} = unit | _] = _ ) do + # todo: handle bulk? data = unit |> filter_primary_keys(false) @@ -161,15 +177,23 @@ defmodule ArkePostgres do {:ok, unit} end - def handle_update(project, %{data: %{type: "arke"}} = arke, unit) do - ArkeUnit.update(project, arke, unit) - {:ok, unit} + def handle_update(project, %{data: %{type: "arke"}} = arke, unit_list) do + case ArkeUnit.update(project, arke, unit_list) do + {:ok, records, valid, errors} -> + {:ok, valid, errors} + + {:error, errors, _, _} -> + {:error, errors} + end end def handle_update(_, _, _) do {:error, "arke type not supported"} end + defp get_operation_result(valid, errors, true), do: {:ok, valid, errors} + defp get_operation_result(valid, errors, _), do: {:ok, List.first(valid)} + def delete(project, %{arke_id: arke_id} = unit, opts \\ []) do arke = Arke.Boundary.ArkeManager.get(arke_id, project) handle_delete(project, arke, unit) diff --git a/lib/arke_postgres/arke_unit.ex b/lib/arke_postgres/arke_unit.ex index b7fc786..383598b 100644 --- a/lib/arke_postgres/arke_unit.ex +++ b/lib/arke_postgres/arke_unit.ex @@ -73,18 +73,40 @@ defmodule ArkePostgres.ArkeUnit do # TODO handle error defp handle_id(id), do: id - def update(project, arke, %{data: data} = unit, where \\ []) do - where = Keyword.put_new(where, :arke_id, Atom.to_string(unit.arke_id)) - where = Keyword.put_new(where, :id, Atom.to_string(unit.id)) - - row = [ - data: encode_unit_data(arke, data), - metadata: Map.delete(unit.metadata, :project), - updated_at: unit.updated_at - ] - - query = from("arke_unit", where: ^where, update: [set: ^row]) - ArkePostgres.Repo.update_all(query, [], prefix: project) + def update(project, arke, unit_list) do + records = + Enum.map(unit_list, fn unit -> + %{ + id: to_string(unit.id), + arke_id: to_string(arke.id), + data: encode_unit_data(arke, unit.data), + metadata: Map.delete(unit.metadata, :project), + inserted_at: DateTime.to_naive(unit.inserted_at) |> NaiveDateTime.truncate(:second), + updated_at: DateTime.to_naive(unit.updated_at) + } + end) + + case ArkePostgres.Repo.insert_all( + ArkePostgres.Tables.ArkeUnit, + records, + prefix: project, + on_conflict: {:replace_all_except, [:id]}, + conflict_target: :id, + returning: true + ) do + {0, _} -> + {:error, Error.create(:insert, "no records inserted"), nil} + + {_, updated} -> + updated_ids = Enum.map(updated, & &1.id) + + {valid, errors} = + Enum.split_with(unit_list, fn unit -> + to_string(unit.id) in updated_ids + end) + + {:ok, updated, valid, errors} + end end def delete(project, arke, unit) do From 61a059ad3db566617b822b57a689ab10a0cdb927 Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Mon, 3 Jun 2024 12:54:09 +0200 Subject: [PATCH 03/17] feat: bulk delete --- lib/arke_postgres.ex | 34 ++++++++++++++++++++++++++++------ lib/arke_postgres/arke_unit.ex | 9 ++++++--- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/lib/arke_postgres.ex b/lib/arke_postgres.ex index ae1d4ad..04f512a 100644 --- a/lib/arke_postgres.ex +++ b/lib/arke_postgres.ex @@ -194,12 +194,34 @@ defmodule ArkePostgres do defp get_operation_result(valid, errors, true), do: {:ok, valid, errors} defp get_operation_result(valid, errors, _), do: {:ok, List.first(valid)} - def delete(project, %{arke_id: arke_id} = unit, opts \\ []) do + def delete(project, unit, opts \\ []) + + def delete(project, %{arke_id: arke_id} = unit, opts), do: delete(project, [unit], opts) + + def delete(project, [], opts), do: get_operation_result([], [], opts[:bulk]) + + def delete(project, [%{arke_id: arke_id} | _] = unit_list, opts) do arke = Arke.Boundary.ArkeManager.get(arke_id, project) - handle_delete(project, arke, unit) + + case handle_delete(project, arke, unit_list) do + {:ok, valid, errors} -> + get_operation_result( + valid, + errors, + opts[:bulk] + ) + + {:error, errors} -> + {:error, errors} + end end - defp handle_delete(project, %{data: %{type: "table"}} = arke, %{metadata: metadata} = unit) do + defp handle_delete( + project, + %{data: %{type: "table"}} = arke, + [%{metadata: metadata} = unit | _] = _ + ) do + # todo: handle bulk? metadata = Map.delete(metadata, :project) where = unit |> filter_primary_keys(true) |> Map.put_new(:metadata, metadata) |> data_as_klist @@ -210,9 +232,9 @@ defmodule ArkePostgres do end end - defp handle_delete(project, %{data: %{type: "arke"}} = arke, unit) do - case ArkeUnit.delete(project, arke, unit) do - {:ok, _} -> {:ok, nil} + defp handle_delete(project, %{data: %{type: "arke"}} = arke, unit_list) do + case ArkeUnit.delete(project, arke, unit_list) do + {:ok, _} -> {:ok, unit_list, []} {:error, msg} -> {:error, msg} end end diff --git a/lib/arke_postgres/arke_unit.ex b/lib/arke_postgres/arke_unit.ex index 383598b..cf662de 100644 --- a/lib/arke_postgres/arke_unit.ex +++ b/lib/arke_postgres/arke_unit.ex @@ -109,9 +109,12 @@ defmodule ArkePostgres.ArkeUnit do end end - def delete(project, arke, unit) do - where = [arke_id: Atom.to_string(arke.id), id: Atom.to_string(unit.id)] - query = from(a in "arke_unit", where: ^where) + def delete(project, arke, unit_list) do + query = + from(a in "arke_unit", + where: a.arke_id == ^Atom.to_string(arke.id), + where: a.id in ^Enum.map(unit_list, &Atom.to_string(&1.id)) + ) case ArkePostgres.Repo.delete_all(query, prefix: project) do {0, nil} -> From 273d10fe40d2050a2f2742db5edc5e61e7ad0f9b Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Tue, 4 Jun 2024 18:04:41 +0200 Subject: [PATCH 04/17] fix: adjust check uniques --- lib/arke_postgres.ex | 70 ++++++++++++---------------------- lib/arke_postgres/arke_unit.ex | 22 +++++++---- 2 files changed, 38 insertions(+), 54 deletions(-) diff --git a/lib/arke_postgres.ex b/lib/arke_postgres.ex index 04f512a..f08ee79 100644 --- a/lib/arke_postgres.ex +++ b/lib/arke_postgres.ex @@ -89,20 +89,21 @@ defmodule ArkePostgres do def create(project, %{arke_id: arke_id} = unit, opts), do: create(project, [unit], opts) - def create(_, [], opts), do: get_operation_result([], [], opts[:bulk]) + def create(_project, [], _opts), do: {:ok, 0, [], []} def create(project, [%{arke_id: arke_id} | _] = unit_list, opts) do arke = Arke.Boundary.ArkeManager.get(arke_id, project) - case handle_create(project, arke, unit_list) do - {:ok, valid, errors} -> - get_operation_result( - Enum.map(valid, fn unit -> - Arke.Core.Unit.update(unit, metadata: Map.merge(unit.metadata, %{project: project})) - end), - errors, - opts[:bulk] - ) + case handle_create(project, arke, unit_list, opts) do + {:ok, unit} -> + {:ok, + Arke.Core.Unit.update(unit, metadata: Map.merge(unit.metadata, %{project: project}))} + + {:ok, count, valid, errors} -> + {:ok, count, + Enum.map(valid, fn unit -> + Arke.Core.Unit.update(unit, metadata: Map.merge(unit.metadata, %{project: project})) + end), errors} {:error, errors} -> {:error, errors} @@ -112,26 +113,20 @@ defmodule ArkePostgres do defp handle_create( project, %{data: %{type: "table"}} = arke, - [%{data: data, metadata: metadata} = unit | _] = _ + [%{data: data, metadata: metadata} = unit | _] = _, + _opts ) do # todo: handle bulk? # todo: remove once the project is not needed anymore data = data |> Map.merge(%{metadata: Map.delete(metadata, :project)}) |> data_as_klist Table.insert(project, arke, data) - {:ok, [unit], []} + {:ok, unit} end - defp handle_create(project, %{data: %{type: "arke"}} = arke, unit_list) do - case ArkeUnit.insert(project, arke, unit_list) do - {:ok, records, valid, errors} -> - {:ok, valid, errors} + defp handle_create(project, %{data: %{type: "arke"}} = arke, unit_list, opts), + do: ArkeUnit.insert(project, arke, unit_list, opts) - {:error, errors, _, _} -> - {:error, errors} - end - end - - defp handle_create(proj, arke, unit) do + defp handle_create(_project, _arke, _unit, _opts) do {:error, "arke type not supported"} end @@ -140,28 +135,18 @@ defmodule ArkePostgres do def update(project, %{arke_id: arke_id} = unit, opts), do: update(project, [unit], opts) - def update(_, [], opts), do: get_operation_result([], [], opts[:bulk]) + def update(_project, [], _opts), do: {:ok, 0, [], []} def update(project, [%{arke_id: arke_id} | _] = unit_list, opts) do arke = Arke.Boundary.ArkeManager.get(arke_id, project) - - case handle_update(project, arke, unit_list) do - {:ok, valid, errors} -> - get_operation_result( - valid, - errors, - opts[:bulk] - ) - - {:error, errors} -> - {:error, errors} - end + handle_update(project, arke, unit_list, opts) end def handle_update( project, %{data: %{type: "table"}} = arke, - [%{data: data, metadata: metadata} = unit | _] = _ + [%{data: data, metadata: metadata} = unit | _] = _, + _opts ) do # todo: handle bulk? data = @@ -177,17 +162,10 @@ defmodule ArkePostgres do {:ok, unit} end - def handle_update(project, %{data: %{type: "arke"}} = arke, unit_list) do - case ArkeUnit.update(project, arke, unit_list) do - {:ok, records, valid, errors} -> - {:ok, valid, errors} - - {:error, errors, _, _} -> - {:error, errors} - end - end + def handle_update(project, %{data: %{type: "arke"}} = arke, unit_list, opts), + do: ArkeUnit.update(project, arke, unit_list, opts) - def handle_update(_, _, _) do + def handle_update(_project, _arke, _unit, _opts) do {:error, "arke type not supported"} end diff --git a/lib/arke_postgres/arke_unit.ex b/lib/arke_postgres/arke_unit.ex index cf662de..61b678e 100644 --- a/lib/arke_postgres/arke_unit.ex +++ b/lib/arke_postgres/arke_unit.ex @@ -19,7 +19,7 @@ defmodule ArkePostgres.ArkeUnit do @record_fields [:id, :data, :metadata, :inserted_at, :updated_at] - def insert(project, arke, unit_list) do + def insert(project, arke, unit_list, opts \\ []) do now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second) %{unit_list: updated_unit_list, records: records} = @@ -53,9 +53,9 @@ defmodule ArkePostgres.ArkeUnit do ) ) do {0, _} -> - {:error, Error.create(:insert, "no records inserted"), nil} + {:error, Error.create(:insert, "no records inserted")} - {_, inserted} -> + {count, inserted} -> inserted_ids = Enum.map(inserted, & &1.id) {valid, errors} = @@ -63,7 +63,10 @@ defmodule ArkePostgres.ArkeUnit do unit.id in inserted_ids end) - {:ok, inserted, valid, errors} + case opts[:bulk] do + true -> {:ok, count, valid, errors} + _ -> {:ok, List.first(valid)} + end end end @@ -73,7 +76,7 @@ defmodule ArkePostgres.ArkeUnit do # TODO handle error defp handle_id(id), do: id - def update(project, arke, unit_list) do + def update(project, arke, unit_list, opts) do records = Enum.map(unit_list, fn unit -> %{ @@ -95,9 +98,9 @@ defmodule ArkePostgres.ArkeUnit do returning: true ) do {0, _} -> - {:error, Error.create(:insert, "no records inserted"), nil} + {:error, Error.create(:update, "no records updated")} - {_, updated} -> + {count, updated} -> updated_ids = Enum.map(updated, & &1.id) {valid, errors} = @@ -105,7 +108,10 @@ defmodule ArkePostgres.ArkeUnit do to_string(unit.id) in updated_ids end) - {:ok, updated, valid, errors} + case opts[:bulk] do + true -> {:ok, count, valid, errors} + _ -> {:ok, List.first(valid)} + end end end From cd04bd752cc9a4f824b0793a27c8fa18767c182f Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Tue, 18 Jun 2024 11:38:16 +0200 Subject: [PATCH 05/17] fix: delete return --- lib/arke_postgres.ex | 23 +++-------------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/lib/arke_postgres.ex b/lib/arke_postgres.ex index f08ee79..da93a11 100644 --- a/lib/arke_postgres.ex +++ b/lib/arke_postgres.ex @@ -169,29 +169,15 @@ defmodule ArkePostgres do {:error, "arke type not supported"} end - defp get_operation_result(valid, errors, true), do: {:ok, valid, errors} - defp get_operation_result(valid, errors, _), do: {:ok, List.first(valid)} - def delete(project, unit, opts \\ []) def delete(project, %{arke_id: arke_id} = unit, opts), do: delete(project, [unit], opts) - def delete(project, [], opts), do: get_operation_result([], [], opts[:bulk]) + def delete(project, [], opts), do: {:ok, nil} def delete(project, [%{arke_id: arke_id} | _] = unit_list, opts) do arke = Arke.Boundary.ArkeManager.get(arke_id, project) - - case handle_delete(project, arke, unit_list) do - {:ok, valid, errors} -> - get_operation_result( - valid, - errors, - opts[:bulk] - ) - - {:error, errors} -> - {:error, errors} - end + handle_delete(project, arke, unit_list) end defp handle_delete( @@ -211,10 +197,7 @@ defmodule ArkePostgres do end defp handle_delete(project, %{data: %{type: "arke"}} = arke, unit_list) do - case ArkeUnit.delete(project, arke, unit_list) do - {:ok, _} -> {:ok, unit_list, []} - {:error, msg} -> {:error, msg} - end + ArkeUnit.delete(project, arke, unit_list) end defp handle_delete(_, _, _) do From f21e6a958e50dbd2a9e23fbcc9207d59857e4113 Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Wed, 19 Jun 2024 14:26:44 +0200 Subject: [PATCH 06/17] fix: make Query handle_condition public --- lib/arke_postgres/query.ex | 218 +++++++++++++++++++++++-------------- 1 file changed, 137 insertions(+), 81 deletions(-) diff --git a/lib/arke_postgres/query.ex b/lib/arke_postgres/query.ex index a87661a..8686fbd 100644 --- a/lib/arke_postgres/query.ex +++ b/lib/arke_postgres/query.ex @@ -56,7 +56,10 @@ defmodule ArkePostgres.Query do do: get_table_column(parameter) def get_manager_units(project_id) do - arke_link =%{id: :arke_link, data: %{parameters: [%{id: :type},%{id: :child_id},%{id: :parent_id},%{id: :metadata}]}} + arke_link = %{ + id: :arke_link, + data: %{parameters: [%{id: :type}, %{id: :child_id}, %{id: :parent_id}, %{id: :metadata}]} + } links = from(q in table_query(arke_link, nil), where: q.type in ["parameter", "group"]) @@ -65,7 +68,6 @@ defmodule ArkePostgres.Query do parameter_links = Enum.filter(links, fn x -> x.type == "parameter" end) group_links = Enum.filter(links, fn x -> x.type == "group" end) - parameters_id = Arke.Utils.DefaultData.get_parameters_id() list_arke_id = Arke.Utils.DefaultData.get_arke_id() @@ -74,7 +76,6 @@ defmodule ArkePostgres.Query do from(q in base_query(), where: q.arke_id in ^list_arke_id) |> ArkePostgres.Repo.all(prefix: project_id) - parameters = parse_parameters(Enum.filter(unit_list, fn u -> u.arke_id in parameters_id end)) arke_list = @@ -82,6 +83,7 @@ defmodule ArkePostgres.Query do Enum.filter(unit_list, fn u -> u.arke_id == "arke" end), parameter_links ) + groups = parse_groups( Enum.filter(unit_list, fn u -> u.arke_id == "group" end), @@ -105,24 +107,40 @@ defmodule ArkePostgres.Query do Enum.filter(parameter_links, fn x -> x.parent_id == id end), [], fn p, new_params -> - [%{id: String.to_atom(p.child_id), metadata: Enum.reduce(p.metadata,%{}, fn {key, val}, acc -> Map.put(acc, String.to_atom(key), val) end)} | new_params] + [ + %{ + id: String.to_atom(p.child_id), + metadata: + Enum.reduce(p.metadata, %{}, fn {key, val}, acc -> + Map.put(acc, String.to_atom(key), val) + end) + } + | new_params + ] end ) - updated_data = Enum.reduce(unit.data,%{}, fn {k,db_data},acc -> Map.put(acc,String.to_atom(k),db_data["value"]) end) + + updated_data = + Enum.reduce(unit.data, %{}, fn {k, db_data}, acc -> + Map.put(acc, String.to_atom(k), db_data["value"]) + end) |> Map.put(:id, id) - |> Map.update(:parameters,[], fn current -> params ++ current end) + |> Map.update(:parameters, [], fn current -> params ++ current end) - [ updated_data | new_arke_list] + [updated_data | new_arke_list] end) end - defp parse_parameters(parameter_list)do + defp parse_parameters(parameter_list) do Enum.reduce(parameter_list, [], fn %{id: id, arke_id: arke_id} = unit, new_parameter_list -> + updated_data = + Enum.reduce(unit.data, %{}, fn {k, db_data}, acc -> + Map.put(acc, String.to_atom(k), db_data["value"]) + end) + |> Map.put(:id, id) + |> Map.put(:type, arke_id) - updated_data = Enum.reduce(unit.data,%{}, fn {k,db_data},acc -> Map.put(acc,String.to_atom(k),db_data["value"]) end) - |> Map.put(:id, id) - |> Map.put(:type,arke_id) - [ updated_data | new_parameter_list] + [updated_data | new_parameter_list] end) end @@ -136,22 +154,30 @@ defmodule ArkePostgres.Query do [%{id: String.to_atom(p.child_id), metadata: p.metadata} | new_params] end ) - updated_data = Enum.reduce(unit.data,%{}, fn {k,db_data},acc -> Map.put(acc,String.to_atom(k),db_data["value"]) end) - |> Map.put(:id, id) - |> Map.update(:arke_list,[], fn db_arke_list -> - Enum.reduce(db_arke_list,[], fn key,acc -> - case Enum.find(arke_list, fn %{id: id, metadata: _metadata} -> to_string(id) == key end) do - nil -> [key|acc] - data -> - [data|acc] - end - end) - end) - [ updated_data | new_groups] + + updated_data = + Enum.reduce(unit.data, %{}, fn {k, db_data}, acc -> + Map.put(acc, String.to_atom(k), db_data["value"]) + end) + |> Map.put(:id, id) + |> Map.update(:arke_list, [], fn db_arke_list -> + Enum.reduce(db_arke_list, [], fn key, acc -> + case Enum.find(arke_list, fn %{id: id, metadata: _metadata} -> + to_string(id) == key + end) do + nil -> + [key | acc] + + data -> + [data | acc] + end + end) + end) + + [updated_data | new_groups] end) end - ###################################################################################################################### # PRIVATE FUNCTIONS ################################################################################################## ###################################################################################################################### @@ -160,17 +186,31 @@ defmodule ArkePostgres.Query do defp base_query(%{link: nil} = _arke_query, action), do: arke_query(action) - defp base_query(%{link: %{unit: %{id: link_id},depth: depth, direction: direction,type: type}, project: project} = _arke_query, action), - do: - get_nodes( - project, - action, - [to_string(link_id)], - depth, - direction, - type - ) - defp base_query(%{link: %{unit: unit_list,depth: depth, direction: direction,type: type}, project: project} = _arke_query, action) when is_list(unit_list) do + defp base_query( + %{ + link: %{unit: %{id: link_id}, depth: depth, direction: direction, type: type}, + project: project + } = _arke_query, + action + ), + do: + get_nodes( + project, + action, + [to_string(link_id)], + depth, + direction, + type + ) + + defp base_query( + %{ + link: %{unit: unit_list, depth: depth, direction: direction, type: type}, + project: project + } = _arke_query, + action + ) + when is_list(unit_list) do get_nodes( project, action, @@ -247,7 +287,7 @@ defmodule ArkePostgres.Query do end) end - defp handle_condition(logic, base_filters) do + def handle_condition(logic, base_filters) do Enum.reduce(base_filters, nil, fn %{ parameter: parameter, operator: operator, @@ -263,7 +303,8 @@ defmodule ArkePostgres.Query do add_condition_to_clause(condition, clause, logic) else condition = - filter_query_by_operator(parameter, column, value, operator) |> handle_negate_condition(negate) + filter_query_by_operator(parameter, column, value, operator) + |> handle_negate_condition(negate) add_condition_to_clause(condition, clause, logic) end @@ -296,12 +337,12 @@ defmodule ArkePostgres.Query do defp get_table_column(%{id: id} = _parameter), do: dynamic([q], fragment("?", field(q, ^id))) defp get_arke_column(%{id: id, data: %{multiple: true}} = _parameter), - do: dynamic([q], fragment("(? -> ? ->> 'value')::jsonb", field(q, :data), ^Atom.to_string(id))) + do: + dynamic([q], fragment("(? -> ? ->> 'value')::jsonb", field(q, :data), ^Atom.to_string(id))) defp get_arke_column(%{id: id, arke_id: :string} = _parameter), do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :atom} = _parameter), do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id))) @@ -449,8 +490,11 @@ defmodule ArkePostgres.Query do fragment("? IS NULL AND (data \\? ?)", ^column, ^Atom.to_string(id)) ) - defp filter_query_by_operator(%{data: %{multiple: true}}, column, value, :eq), do: dynamic([q], fragment("jsonb_exists(?, ?)", ^column, ^value)) - defp filter_query_by_operator(parameter, column, value, :eq), do: dynamic([q], ^column == ^value) + defp filter_query_by_operator(%{data: %{multiple: true}}, column, value, :eq), + do: dynamic([q], fragment("jsonb_exists(?, ?)", ^column, ^value)) + + defp filter_query_by_operator(parameter, column, value, :eq), + do: dynamic([q], ^column == ^value) defp filter_query_by_operator(parameter, column, value, :contains), do: dynamic([q], like(^column, fragment("?", ^("%" <> value <> "%")))) @@ -470,11 +514,18 @@ defmodule ArkePostgres.Query do defp filter_query_by_operator(parameter, column, value, :istartswith), do: dynamic([q], ilike(^column, fragment("?", ^(value <> "%")))) - defp filter_query_by_operator(parameter, column, value, :lte), do: dynamic([q], ^column <= ^value) + defp filter_query_by_operator(parameter, column, value, :lte), + do: dynamic([q], ^column <= ^value) + defp filter_query_by_operator(parameter, column, value, :lt), do: dynamic([q], ^column < ^value) defp filter_query_by_operator(parameter, column, value, :gt), do: dynamic([q], ^column > ^value) - defp filter_query_by_operator(parameter, column, value, :gte), do: dynamic([q], ^column >= ^value) - defp filter_query_by_operator(parameter, column, value, :in), do: dynamic([q], ^column in ^value) + + defp filter_query_by_operator(parameter, column, value, :gte), + do: dynamic([q], ^column >= ^value) + + defp filter_query_by_operator(parameter, column, value, :in), + do: dynamic([q], ^column in ^value) + defp filter_query_by_operator(parameter, column, value, _), do: dynamic([q], ^column == ^value) # defp filter_query_by_operator(query, key, value, "between"), do: from q in query, where: column_table(q, ^key) == ^value @@ -510,7 +561,9 @@ defmodule ArkePostgres.Query do where_field = get_where_field_by_direction(direction) |> get_where_condition_by_type(type) get_link_query(action, project, unit_id, link_field, tree_field, depth, where_field) end - def get_nodes(project, action, unit_id, depth, direction, type), do: get_nodes(project, action, [unit_id], depth, direction, type) + + def get_nodes(project, action, unit_id, depth, direction, type), + do: get_nodes(project, action, [unit_id], depth, direction, type) defp get_project(project) when is_atom(project), do: Atom.to_string(project) defp get_project(project), do: project @@ -556,40 +609,43 @@ defmodule ArkePostgres.Query do end defp get_link_query(_action, project, unit_id_list, link_field, tree_field, depth, where_field) do - q = from(r in from(a in "arke_unit", - left_join: - cte in fragment( - @raw_cte_query, - literal(^link_field), - literal(^project), - literal(^link_field), - ^unit_id_list, - literal(^project), - literal(^project), - literal(^project), - literal(^project), - literal(^project), - literal(^project), - literal(^link_field), - literal(^tree_field), - ^depth - ), - where: ^where_field, - distinct: a.id, - select: %{ - id: a.id, - arke_id: a.arke_id, - data: a.data, - metadata: a.metadata, - inserted_at: a.inserted_at, - updated_at: a.updated_at, - depth: cte.depth, - link_metadata: cte.metadata, - link_type: cte.type, - starting_unit: cte.starting_unit - } - )) - from x in subquery(q), select: x - end + q = + from( + r in from(a in "arke_unit", + left_join: + cte in fragment( + @raw_cte_query, + literal(^link_field), + literal(^project), + literal(^link_field), + ^unit_id_list, + literal(^project), + literal(^project), + literal(^project), + literal(^project), + literal(^project), + literal(^project), + literal(^link_field), + literal(^tree_field), + ^depth + ), + where: ^where_field, + distinct: a.id, + select: %{ + id: a.id, + arke_id: a.arke_id, + data: a.data, + metadata: a.metadata, + inserted_at: a.inserted_at, + updated_at: a.updated_at, + depth: cte.depth, + link_metadata: cte.metadata, + link_type: cte.type, + starting_unit: cte.starting_unit + } + ) + ) + from(x in subquery(q), select: x) + end end From bd56b8a08c24d3936c2c10c797f135bc6dc5cf6b Mon Sep 17 00:00:00 2001 From: Ilyich Vismara <81801314+ilyichv@users.noreply.github.com> Date: Wed, 16 Oct 2024 15:16:22 +0200 Subject: [PATCH 07/17] feat: add nested filters and sort (#45) --- lib/arke_postgres/query.ex | 340 +++++++++++++++++++++++++++---------- 1 file changed, 253 insertions(+), 87 deletions(-) diff --git a/lib/arke_postgres/query.ex b/lib/arke_postgres/query.ex index 520cfb8..067c25d 100644 --- a/lib/arke_postgres/query.ex +++ b/lib/arke_postgres/query.ex @@ -23,6 +23,7 @@ defmodule ArkePostgres.Query do action ) do base_query(arke_query, action) + |> handle_paths_join(filters, orders) |> handle_filters(filters) |> handle_orders(orders) |> handle_offset(offset) @@ -49,13 +50,16 @@ defmodule ArkePostgres.Query do def execute(query, :pseudo_query), do: generate_query(query, :pseudo_query) - def get_column(%{data: %{persistence: "arke_parameter"}} = parameter), - do: get_arke_column(parameter) + def get_column(column), do: get_column(column, false) - def get_column(%{data: %{persistence: "table_column"}} = parameter), + def get_column(%{data: %{persistence: "arke_parameter"}} = parameter, joined), + do: get_arke_column(parameter, joined) + + def get_column(%{data: %{persistence: "table_column"}} = parameter, _joined), do: get_table_column(parameter) def remove_arke_system(metadata, project_id) when project_id == :arke_system, do: metadata + def remove_arke_system(metadata, project_id) do case Map.get(metadata, "project") do "arke_system" -> Map.delete(metadata, "project") @@ -95,7 +99,8 @@ defmodule ArkePostgres.Query do units_map = Map.new(unit_list, &{&1.id, &1.metadata}) parameter_links = merge_unit_metadata(parameter_links, units_map, project_id) - parameters = parse_parameters(Enum.filter(unit_list, fn u -> u.arke_id in parameters_id end), project_id) + parameters = + parse_parameters(Enum.filter(unit_list, fn u -> u.arke_id in parameters_id end), project_id) arke_list = parse_arke_list( @@ -145,21 +150,26 @@ defmodule ArkePostgres.Query do end) |> Map.put(:id, id) |> Map.put(:metadata, metadata) - |> Map.update(:parameters,[], fn current -> params ++ current end) + |> Map.update(:parameters, [], fn current -> params ++ current end) [updated_data | new_arke_list] end) end - defp parse_parameters(parameter_list, project_id)do - Enum.reduce(parameter_list, [], fn %{id: id, arke_id: arke_id, metadata: metadata} = unit, new_parameter_list -> + defp parse_parameters(parameter_list, project_id) do + Enum.reduce(parameter_list, [], fn %{id: id, arke_id: arke_id, metadata: metadata} = unit, + new_parameter_list -> parsed_metadata = remove_arke_system(metadata, project_id) - updated_data = Enum.reduce(unit.data,%{}, fn {k,db_data},acc -> Map.put(acc,String.to_atom(k),db_data["value"]) end) - |> Map.put(:id, id) - |> Map.put(:type, arke_id) - |> Map.put(:metadata, parsed_metadata) - [ updated_data | new_parameter_list] + updated_data = + Enum.reduce(unit.data, %{}, fn {k, db_data}, acc -> + Map.put(acc, String.to_atom(k), db_data["value"]) + end) + |> Map.put(:id, id) + |> Map.put(:type, arke_id) + |> Map.put(:metadata, parsed_metadata) + + [updated_data | new_parameter_list] end) end @@ -173,19 +183,28 @@ defmodule ArkePostgres.Query do [%{id: String.to_atom(p.child_id), metadata: p.metadata} | new_params] end ) - updated_data = Enum.reduce(unit.data,%{}, fn {k,db_data},acc -> Map.put(acc,String.to_atom(k),db_data["value"]) end) - |> Map.put(:id, id) - |> Map.put(:metadata, metadata) - |> Map.update(:arke_list,[], fn db_arke_list -> - Enum.reduce(db_arke_list,[], fn key,acc -> - case Enum.find(arke_list, fn %{id: id, metadata: _metadata} -> to_string(id) == key end) do - nil -> [key|acc] - data -> - [data|acc] - end - end) - end) - [ updated_data | new_groups] + + updated_data = + Enum.reduce(unit.data, %{}, fn {k, db_data}, acc -> + Map.put(acc, String.to_atom(k), db_data["value"]) + end) + |> Map.put(:id, id) + |> Map.put(:metadata, metadata) + |> Map.update(:arke_list, [], fn db_arke_list -> + Enum.reduce(db_arke_list, [], fn key, acc -> + case Enum.find(arke_list, fn %{id: id, metadata: _metadata} -> + to_string(id) == key + end) do + nil -> + [key | acc] + + data -> + [data | acc] + end + end) + end) + + [updated_data | new_groups] end) end @@ -290,38 +309,80 @@ defmodule ArkePostgres.Query do Arke.Core.Unit.load(arke, record) end + defp handle_paths_join(query, filters, orders) do + paths = extract_paths(filters) ++ extract_paths(orders) + + case paths do + [] -> + query + + _ -> + conditions = + Enum.reduce(paths, nil, fn path, acc -> + condition = dynamic([q, j], ^get_column(List.first(path)) == j.id) + if is_nil(acc), do: condition, else: dynamic([q, j], ^acc or ^condition) + end) + + from(q in query, join: j in "arke_unit", on: ^conditions) + end + end + + defp extract_paths(items) do + items + |> Enum.flat_map(fn + %{base_filters: base_filters} -> Enum.map(base_filters, & &1.path) + %{path: path} -> [path] + _ -> [] + end) + |> Enum.reject(&(is_nil(&1) or length(&1) == 0)) + |> Enum.uniq() + end + def handle_filters(query, filters) do Enum.reduce(filters, query, fn %{logic: logic, negate: negate, base_filters: base_filters}, new_query -> - clause = handle_condition(logic, base_filters) |> handle_negate_condition(negate) + clause = handle_conditions(logic, base_filters) |> handle_negate_condition(negate) from(q in new_query, where: ^clause) end) end - def handle_condition(logic, base_filters) do + defp handle_conditions(logic, base_filters) do Enum.reduce(base_filters, nil, fn %{ parameter: parameter, operator: operator, value: value, - negate: negate + negate: negate, + path: path }, clause -> - column = get_column(parameter) - value = get_value(parameter, value) - - if is_nil(value) or operator == :isnull do - condition = get_nil_query(parameter, column) |> handle_negate_condition(negate) - add_condition_to_clause(condition, clause, logic) + if length(path) == 0 do + parameter_condition(clause, parameter, value, operator, negate, logic) + |> add_condition_to_clause(clause, logic) else - condition = - filter_query_by_operator(parameter, column, value, operator) - |> handle_negate_condition(negate) + # todo enhance to get multi-level path + path_parameter = List.first(path) - add_condition_to_clause(condition, clause, logic) + if not is_nil(path_parameter) do + parameter_condition(clause, parameter, value, operator, negate, logic, true) + |> add_nested_condition_to_clause(clause, logic) + end end end) end + defp parameter_condition(clause, parameter, value, operator, negate, logic, joined \\ nil) do + column = get_column(parameter, joined) + value = get_value(parameter, value) + + if is_nil(value) or operator == :isnull do + condition = get_nil_query(parameter, column) |> handle_negate_condition(negate) + else + condition = + filter_query_by_operator(parameter, column, value, operator) + |> handle_negate_condition(negate) + end + end + defp handle_negate_condition(condition, true), do: dynamic([q], not (^condition)) defp handle_negate_condition(condition, false), do: condition @@ -329,10 +390,20 @@ defmodule ArkePostgres.Query do defp add_condition_to_clause(condition, clause, :and), do: dynamic([q], ^clause and ^condition) defp add_condition_to_clause(condition, clause, :or), do: dynamic([q], ^clause or ^condition) + defp add_nested_condition_to_clause(condition, nil, _), do: dynamic([_q, j], ^condition) + + defp add_nested_condition_to_clause(condition, clause, :and), + do: dynamic([_q, j], ^clause and ^condition) + + defp add_nested_condition_to_clause(condition, clause, :or), + do: dynamic([_q, j], ^clause or ^condition) + defp handle_orders(query, orders) do order_by = - Enum.reduce(orders, [], fn %{parameter: parameter, direction: direction}, new_order_by -> - column = get_column(parameter) + Enum.reduce(orders, [], fn %{parameter: parameter, direction: direction, path: path}, + new_order_by -> + joined = length(path) > 0 + column = get_column(parameter, joined) [{direction, column} | new_order_by] end) @@ -347,61 +418,152 @@ defmodule ArkePostgres.Query do defp get_table_column(%{id: id} = _parameter), do: dynamic([q], fragment("?", field(q, ^id))) - defp get_arke_column(%{id: id, data: %{multiple: true}} = _parameter), + defp get_arke_column(%{id: id, data: %{multiple: true}} = _parameter, true), + do: + dynamic( + [_q, ..., j], + fragment("(? -> ? ->> 'value')::jsonb", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, data: %{multiple: true}} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::jsonb", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :string} = _parameter), + defp get_arke_column(%{id: id, arke_id: :string} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::text", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :string} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :atom} = _parameter), + defp get_arke_column(%{id: id, arke_id: :atom} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::text", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :atom} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :boolean} = _parameter), + defp get_arke_column(%{id: id, arke_id: :boolean} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::boolean", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :boolean} = _parameter, _joined), do: dynamic( [q], fragment("(? -> ? ->> 'value')::boolean", field(q, :data), ^Atom.to_string(id)) ) - defp get_arke_column(%{id: id, arke_id: :datetime} = _parameter), + defp get_arke_column(%{id: id, arke_id: :datetime} = _parameter, true), + do: + dynamic( + [_q, ..., j], + fragment("(? -> ? ->> 'value')::timestamp", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :datetime} = _parameter, _joined), do: dynamic( [q], fragment("(? -> ? ->> 'value')::timestamp", field(q, :data), ^Atom.to_string(id)) ) - defp get_arke_column(%{id: id, arke_id: :date} = _parameter), + defp get_arke_column(%{id: id, arke_id: :date} = _parameter, true), + do: + dynamic( + [_q, ..., j], + fragment("(? -> ? ->> 'value')::date", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :date} = _parameter, _joined), do: dynamic( [q], fragment("(? -> ? ->> 'value')::date", field(q, :data), ^Atom.to_string(id)) ) - defp get_arke_column(%{id: id, arke_id: :time} = _parameter), + defp get_arke_column(%{id: id, arke_id: :time} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::time", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :time} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::time", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :integer} = _parameter), + defp get_arke_column(%{id: id, arke_id: :integer} = _parameter, true), + do: + dynamic( + [_q, ..., j], + fragment("(? -> ? ->> 'value')::integer", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :integer} = _parameter, _joined), do: dynamic( [q], fragment("(? -> ? ->> 'value')::integer", field(q, :data), ^Atom.to_string(id)) ) - defp get_arke_column(%{id: id, arke_id: :float} = _parameter), + defp get_arke_column(%{id: id, arke_id: :float} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::float", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :float} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::float", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :dict} = _parameter), + defp get_arke_column(%{id: id, arke_id: :dict} = _parameter, true), + do: + dynamic( + [_q, ..., j], + fragment("(? -> ? ->> 'value')::JSON", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :dict} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::JSON", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :list} = _parameter), + defp get_arke_column(%{id: id, arke_id: :list} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::JSON", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :list} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::JSON", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :link} = _parameter), + defp get_arke_column(%{id: id, arke_id: :link} = _parameter, true), + do: + dynamic( + [_q, ..., j], + fragment("(? -> ? ->> 'value')::text", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :link} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :dynamic} = _parameter), + defp get_arke_column(%{id: id, arke_id: :dynamic} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::text", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :dynamic} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id))) defp get_value(_parameter, value) when is_nil(value), do: value @@ -620,39 +782,43 @@ defmodule ArkePostgres.Query do end defp get_link_query(_action, project, unit_id_list, link_field, tree_field, depth, where_field) do - q = from(r in from(a in "arke_unit", - left_join: - cte in fragment( - @raw_cte_query, - literal(^link_field), - literal(^project), - literal(^link_field), - ^unit_id_list, - literal(^project), - literal(^project), - literal(^project), - literal(^project), - literal(^project), - literal(^project), - literal(^link_field), - literal(^tree_field), - ^depth - ), - where: ^where_field, - distinct: [a.id, cte.starting_unit], - select: %{ - id: a.id, - arke_id: a.arke_id, - data: a.data, - metadata: a.metadata, - inserted_at: a.inserted_at, - updated_at: a.updated_at, - depth: cte.depth, - link_metadata: cte.metadata, - link_type: cte.type, - starting_unit: cte.starting_unit - } - )) - from x in subquery(q), select: x + q = + from( + r in from(a in "arke_unit", + left_join: + cte in fragment( + @raw_cte_query, + literal(^link_field), + literal(^project), + literal(^link_field), + ^unit_id_list, + literal(^project), + literal(^project), + literal(^project), + literal(^project), + literal(^project), + literal(^project), + literal(^link_field), + literal(^tree_field), + ^depth + ), + where: ^where_field, + distinct: [a.id, cte.starting_unit], + select: %{ + id: a.id, + arke_id: a.arke_id, + data: a.data, + metadata: a.metadata, + inserted_at: a.inserted_at, + updated_at: a.updated_at, + depth: cte.depth, + link_metadata: cte.metadata, + link_type: cte.type, + starting_unit: cte.starting_unit + } + ) + ) + + from(x in subquery(q), select: x) end end From df6842898a7835804b5ec13d0ae92eaa45edddb9 Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Tue, 22 Oct 2024 10:55:30 +0200 Subject: [PATCH 08/17] fix: handle_condition --- lib/arke_postgres/query.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/arke_postgres/query.ex b/lib/arke_postgres/query.ex index 067c25d..ddc663c 100644 --- a/lib/arke_postgres/query.ex +++ b/lib/arke_postgres/query.ex @@ -341,12 +341,12 @@ defmodule ArkePostgres.Query do def handle_filters(query, filters) do Enum.reduce(filters, query, fn %{logic: logic, negate: negate, base_filters: base_filters}, new_query -> - clause = handle_conditions(logic, base_filters) |> handle_negate_condition(negate) + clause = handle_condition(logic, base_filters) |> handle_negate_condition(negate) from(q in new_query, where: ^clause) end) end - defp handle_conditions(logic, base_filters) do + def handle_condition(logic, base_filters) do Enum.reduce(base_filters, nil, fn %{ parameter: parameter, operator: operator, From 93f95d5c345f8b6f91c06192c9b255f7dcd7d924 Mon Sep 17 00:00:00 2001 From: Ilyich Vismara <81801314+ilyichv@users.noreply.github.com> Date: Mon, 10 Feb 2025 17:54:21 +0100 Subject: [PATCH 09/17] wip: add topology bulk operations (#47) --- lib/arke_postgres.ex | 51 ++++++-- lib/arke_postgres/arke_link.ex | 117 ++++++++++++++++++ lib/arke_postgres/query.ex | 91 ++++++++------ lib/arke_postgres/tables/arke_link.ex | 10 +- lib/arke_postgres/tables/arke_unit.ex | 2 + .../20220610104406_initial_migration.exs | 28 +++-- 6 files changed, 239 insertions(+), 60 deletions(-) create mode 100644 lib/arke_postgres/arke_link.ex diff --git a/lib/arke_postgres.ex b/lib/arke_postgres.ex index bee6e70..a67282d 100644 --- a/lib/arke_postgres.ex +++ b/lib/arke_postgres.ex @@ -14,14 +14,16 @@ defmodule ArkePostgres do alias Arke.Boundary.{GroupManager, ArkeManager} - alias ArkePostgres.{Table, ArkeUnit, Query} + alias ArkePostgres.{Table, ArkeUnit, ArkeLink, Query} def init() do case check_env() do {:ok, nil} -> try do + projects = + Query.get_project_record() + |> Enum.sort_by(&(to_string(&1.id) == "arke_system"), :desc) - projects =Query.get_project_record() |> Enum.sort_by(&(to_string(&1.id) == "arke_system"),:desc) Enum.each(projects, fn %{id: project_id} = _project -> start_managers(project_id) end) @@ -29,9 +31,14 @@ defmodule ArkePostgres do :ok rescue err in DBConnection.ConnectionError -> - %{message: message,reason: reason} = err - parsed_message = %{context: "db_connection_error", message: "error: #{err}, msg: #{message}"} - IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ]) + %{message: message, reason: reason} = err + + parsed_message = %{ + context: "db_connection_error", + message: "error: #{err}, msg: #{message}" + } + + IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan]) :error err in Postgrex.Error -> @@ -112,6 +119,14 @@ defmodule ArkePostgres do end end + defp handle_create( + project, + %{id: :arke_link} = arke, + unit_list, + opts + ), + do: ArkeLink.insert(project, arke, unit_list, opts) + defp handle_create( project, %{data: %{type: "table"}} = arke, @@ -120,6 +135,7 @@ defmodule ArkePostgres do ) do # todo: handle bulk? # todo: remove once the project is not needed anymore + data = data |> Map.merge(%{metadata: Map.delete(metadata, :project)}) |> data_as_klist Table.insert(project, arke, data) {:ok, unit} @@ -144,6 +160,14 @@ defmodule ArkePostgres do handle_update(project, arke, unit_list, opts) end + def handle_update( + project, + %{id: :arke_link} = arke, + unit_list, + opts + ), + do: ArkeLink.update(project, arke, unit_list, opts) + def handle_update( project, %{data: %{type: "table"}} = arke, @@ -182,6 +206,13 @@ defmodule ArkePostgres do handle_delete(project, arke, unit_list) end + defp handle_delete( + project, + %{id: :arke_link} = arke, + unit_list + ), + do: ArkeLink.delete(project, arke, unit_list) + defp handle_delete( project, %{data: %{type: "table"}} = arke, @@ -257,15 +288,17 @@ defmodule ArkePostgres do IO.inspect("DBConnection.ConnectionError") %{message: message} = err parsed_message = %{context: "db_connection_error", message: "#{message}"} - IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ]) + IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan]) :error + err in Postgrex.Error -> IO.inspect("Postgrex.Error") - %{message: message,postgres: %{code: code, message: postgres_message}} = err + %{message: message, postgres: %{code: code, message: postgres_message}} = err parsed_message = %{context: "postgrex_error", message: "#{message || postgres_message}"} - IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ]) + IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan]) :error - err -> + + err -> IO.inspect("uncatched error") IO.inspect(err) :error diff --git a/lib/arke_postgres/arke_link.ex b/lib/arke_postgres/arke_link.ex new file mode 100644 index 0000000..4782c1b --- /dev/null +++ b/lib/arke_postgres/arke_link.ex @@ -0,0 +1,117 @@ +# Copyright 2023 Arkemis S.r.l. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule ArkePostgres.ArkeLink do + import Ecto.Query, only: [from: 2, dynamic: 2, dynamic: 1] + alias Arke.Utils.ErrorGenerator, as: Error + + def get_all(project, schema, fields, where \\ []) do + query = from(ArkePostgres.Tables.ArkeLink, select: ^fields, where: ^where) + ArkePostgres.Repo.all(query, prefix: project) + end + + def get_by(project, schema, fields, where) do + query = from(ArkePostgres.Tables.ArkeLink, select: ^fields, where: ^where) + ArkePostgres.Repo.one(query, prefix: project) + end + + def insert(project, schema, data, opts \\ []) do + records = + Enum.map(data, fn unit -> + %{ + parent_id: Map.get(unit.data, :parent_id), + child_id: Map.get(unit.data, :child_id), + type: Map.get(unit.data, :type), + metadata: Map.get(unit, :metadata) + } + end) + + case ArkePostgres.Repo.insert_all(ArkePostgres.Tables.ArkeLink, records, + prefix: project, + returning: [:child_id, :parent_id, :type] + ) do + {0, _} -> + {:error, Error.create(:insert, "no records inserted")} + + {count, inserted} -> + inserted_keys = + Enum.map(inserted, fn link -> {link.type, link.parent_id, link.child_id} end) + + {valid, errors} = + Enum.split_with(data, fn unit -> + {unit.data[:type], unit.data[:parent_id], unit.data[:child_id]} in inserted_keys + end) + + case opts[:bulk] do + true -> {:ok, count, valid, errors} + _ -> {:ok, List.first(valid)} + end + end + end + + def update(project, schema, data, opts \\ []) do + records = + Enum.map(data, fn unit -> + %{ + parent_id: Map.get(unit.data, :parent_id), + child_id: Map.get(unit.data, :child_id), + type: Map.get(unit.data, :type), + metadata: Map.delete(Map.get(unit, :metadata), :project) + } + end) + + case ArkePostgres.Repo.insert_all(ArkePostgres.Tables.ArkeLink, records, + prefix: project, + returning: [:child_id, :parent_id, :type], + on_conflict: {:replace_all_except, [:type, :parent_id, :child_id, :id]}, + conflict_target: [:type, :parent_id, :child_id] + ) do + {0, _} -> + {:error, Error.create(:insert, "no records inserted")} + + {count, updated} -> + updated_keys = + Enum.map(updated, fn link -> {link.type, link.parent_id, link.child_id} end) + + {valid, errors} = + Enum.split_with(data, fn unit -> + {unit.data[:type], unit.data[:parent_id], unit.data[:child_id]} in updated_keys + end) + + case opts[:bulk] do + true -> {:ok, count, valid, errors} + _ -> {:ok, List.first(valid)} + end + end + end + + def delete(project, schema, unit_list) do + where = + Enum.reduce(unit_list, dynamic(false), fn unit, dynamic -> + dynamic( + [a], + ^dynamic or + (a.parent_id == ^unit.data.parent_id and a.child_id == ^unit.data.child_id and + a.type == ^unit.data.type) + ) + end) + + query = from(ArkePostgres.Tables.ArkeLink, where: ^where) + + case ArkePostgres.Repo.delete_all(query, prefix: project) do + {0, nil} -> Error.create(:delete, "item not found") + _ -> {:ok, nil} + end + end +end diff --git a/lib/arke_postgres/query.ex b/lib/arke_postgres/query.ex index ddc663c..3f61b80 100644 --- a/lib/arke_postgres/query.ex +++ b/lib/arke_postgres/query.ex @@ -14,6 +14,7 @@ defmodule ArkePostgres.Query do import Ecto.Query + require IEx alias Arke.Utils.DatetimeHandler, as: DatetimeHandler @record_fields [:id, :arke_id, :data, :metadata, :inserted_at, :updated_at] @@ -327,46 +328,61 @@ defmodule ArkePostgres.Query do end end - defp extract_paths(items) do - items - |> Enum.flat_map(fn - %{base_filters: base_filters} -> Enum.map(base_filters, & &1.path) - %{path: path} -> [path] - _ -> [] - end) + defp extract_paths(filters) do + filters + |> Enum.flat_map(&extract_path/1) |> Enum.reject(&(is_nil(&1) or length(&1) == 0)) |> Enum.uniq() end + def extract_path(%Arke.Core.Query.Filter{base_filters: base_filters}), + do: Enum.flat_map(base_filters, &extract_path/1) + + def extract_path(%Arke.Core.Query.BaseFilter{path: path}), do: [path] + def extract_path(_), do: [] + def handle_filters(query, filters) do Enum.reduce(filters, query, fn %{logic: logic, negate: negate, base_filters: base_filters}, - new_query -> - clause = handle_condition(logic, base_filters) |> handle_negate_condition(negate) - from(q in new_query, where: ^clause) + query -> + clause = build_filter_clause(logic, base_filters) |> handle_negate_condition(negate) + from(q in query, where: ^clause) end) end - def handle_condition(logic, base_filters) do - Enum.reduce(base_filters, nil, fn %{ - parameter: parameter, - operator: operator, - value: value, - negate: negate, - path: path - }, - clause -> - if length(path) == 0 do - parameter_condition(clause, parameter, value, operator, negate, logic) - |> add_condition_to_clause(clause, logic) - else - # todo enhance to get multi-level path - path_parameter = List.first(path) - - if not is_nil(path_parameter) do - parameter_condition(clause, parameter, value, operator, negate, logic, true) - |> add_nested_condition_to_clause(clause, logic) - end - end + defp build_filter_clause(parent_logic, filters), + do: build_filter_clause(nil, parent_logic, filters) + + defp build_filter_clause(clause, parent_logic, filters) do + Enum.reduce(filters, clause, fn + %Arke.Core.Query.Filter{logic: logic, base_filters: nested_filters}, clause -> + nested_clause = build_filter_clause(nil, logic, nested_filters) + add_condition_to_clause(nested_clause, clause, parent_logic) + + %Arke.Core.Query.BaseFilter{ + parameter: parameter, + operator: operator, + value: value, + negate: negate, + path: [] + }, + clause -> + parameter_condition(clause, parameter, value, operator, negate, parent_logic) + |> add_condition_to_clause(clause, parent_logic) + + %Arke.Core.Query.BaseFilter{ + parameter: parameter, + operator: operator, + value: value, + negate: negate, + path: [path_parameter | _] + }, + clause + when not is_nil(path_parameter) -> + parameter_condition(clause, parameter, value, operator, negate, parent_logic, true) + |> add_nested_condition_to_clause(clause, parent_logic) + + _, clause -> + clause end) end @@ -374,13 +390,14 @@ defmodule ArkePostgres.Query do column = get_column(parameter, joined) value = get_value(parameter, value) - if is_nil(value) or operator == :isnull do - condition = get_nil_query(parameter, column) |> handle_negate_condition(negate) - else - condition = + condition = + if is_nil(value) or operator == :isnull do + get_nil_query(parameter, column) + else filter_query_by_operator(parameter, column, value, operator) - |> handle_negate_condition(negate) - end + end + + handle_negate_condition(condition, negate) end defp handle_negate_condition(condition, true), do: dynamic([q], not (^condition)) diff --git a/lib/arke_postgres/tables/arke_link.ex b/lib/arke_postgres/tables/arke_link.ex index b7558c5..d48e986 100644 --- a/lib/arke_postgres/tables/arke_link.ex +++ b/lib/arke_postgres/tables/arke_link.ex @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -defmodule ArkePostgres.ArkeLink do +defmodule ArkePostgres.Tables.ArkeLink do use Ecto.Schema import Ecto.Changeset @@ -20,12 +20,14 @@ defmodule ArkePostgres.ArkeLink do @foreign_key_type :string schema "arke_link" do - field(:type, :string, default: "link") - belongs_to(:parent_id, ArkePostgres.ArkeUnit, primary_key: true) - belongs_to(:child_id, ArkePostgres.ArkeUnit, primary_key: true) + field(:type, :string, default: "link", primary_key: true) + belongs_to(:parent, ArkePostgres.ArkeUnit, primary_key: true, foreign_key: :parent_id) + belongs_to(:child, ArkePostgres.ArkeUnit, primary_key: true, foreign_key: :child_id) field(:metadata, :map, default: %{}) end + # TODO: add insert_all validation + def changeset(args \\ []) do %__MODULE__{} |> cast(args, @arke_link_fields) diff --git a/lib/arke_postgres/tables/arke_unit.ex b/lib/arke_postgres/tables/arke_unit.ex index f2823a5..b9427c2 100644 --- a/lib/arke_postgres/tables/arke_unit.ex +++ b/lib/arke_postgres/tables/arke_unit.ex @@ -27,6 +27,8 @@ defmodule ArkePostgres.Tables.ArkeUnit do timestamps() end + # TODO: add insert_all validation + def changeset(args \\ []) do %__MODULE__{} |> cast(args, @arke_record_fields) diff --git a/priv/repo/migrations/20220610104406_initial_migration.exs b/priv/repo/migrations/20220610104406_initial_migration.exs index 4d6a099..2a3c55c 100644 --- a/priv/repo/migrations/20220610104406_initial_migration.exs +++ b/priv/repo/migrations/20220610104406_initial_migration.exs @@ -3,21 +3,29 @@ defmodule ArkePostgres.Repo.Migrations.InitialMigration do def change do create table(:arke_unit, primary_key: false) do - add :id, :string, primary_key: true - add :arke_id, :string, null: false - add :data, :map, default: %{}, null: false - add :metadata, :map, default: %{}, null: false + add(:id, :string, primary_key: true) + add(:arke_id, :string, null: false) + add(:data, :map, default: %{}, null: false) + add(:metadata, :map, default: %{}, null: false) timestamps() end create table(:arke_link, primary_key: false) do - add :type, :string, default: "link", null: false - add :parent_id, references(:arke_unit, column: :id, type: :string, on_delete: :delete_all), primary_key: true - add :child_id, references(:arke_unit, column: :id, type: :string, on_delete: :delete_all), primary_key: true - add :metadata, :map, default: %{} + add(:type, :string, default: "link", null: false, primary_key: true) + + add(:parent_id, references(:arke_unit, column: :id, type: :string, on_delete: :delete_all), + primary_key: true + ) + + add(:child_id, references(:arke_unit, column: :id, type: :string, on_delete: :delete_all), + primary_key: true + ) + + add(:metadata, :map, default: %{}) end - create index(:arke_link, :parent_id) - create index(:arke_link, :child_id) + create(index(:arke_unit, :arke_id)) + create(index(:arke_link, :parent_id)) + create(index(:arke_link, :child_id)) end end From 161c014cf2d3de33c0f66566285e81d8823f25f0 Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Fri, 18 Apr 2025 12:10:27 +0200 Subject: [PATCH 10/17] wip --- lib/arke_postgres/query.ex | 109 ++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 50 deletions(-) diff --git a/lib/arke_postgres/query.ex b/lib/arke_postgres/query.ex index 3f61b80..ddff28e 100644 --- a/lib/arke_postgres/query.ex +++ b/lib/arke_postgres/query.ex @@ -23,8 +23,10 @@ defmodule ArkePostgres.Query do %{filters: filters, orders: orders, offset: offset, limit: limit} = arke_query, action ) do + paths = extract_paths(filters) ++ extract_paths(orders) + base_query(arke_query, action) - |> handle_paths_join(filters, orders) + |> handle_paths_join(paths) |> handle_filters(filters) |> handle_orders(orders) |> handle_offset(offset) @@ -310,26 +312,20 @@ defmodule ArkePostgres.Query do Arke.Core.Unit.load(arke, record) end - defp handle_paths_join(query, filters, orders) do - paths = extract_paths(filters) ++ extract_paths(orders) + defp handle_paths_join(query, []), do: query - case paths do - [] -> - query - - _ -> - conditions = - Enum.reduce(paths, nil, fn path, acc -> - condition = dynamic([q, j], ^get_column(List.first(path)) == j.id) - if is_nil(acc), do: condition, else: dynamic([q, j], ^acc or ^condition) - end) + defp handle_paths_join(query, paths) do + conditions = + Enum.reduce(paths, nil, fn path, acc -> + condition = dynamic([q, j], ^get_column(List.first(path)) == j.id) + if is_nil(acc), do: condition, else: dynamic([q, j], ^acc or ^condition) + end) - from(q in query, join: j in "arke_unit", on: ^conditions) - end + from(q in query, join: j in "arke_unit", on: ^conditions) end - defp extract_paths(filters) do - filters + defp extract_paths(items) do + items |> Enum.flat_map(&extract_path/1) |> Enum.reject(&(is_nil(&1) or length(&1) == 0)) |> Enum.uniq() @@ -339,6 +335,8 @@ defmodule ArkePostgres.Query do do: Enum.flat_map(base_filters, &extract_path/1) def extract_path(%Arke.Core.Query.BaseFilter{path: path}), do: [path] + def extract_path(%Arke.Core.Query.Order{path: path}), do: [path] + def extract_path(_), do: [] def handle_filters(query, filters) do @@ -352,39 +350,50 @@ defmodule ArkePostgres.Query do defp build_filter_clause(parent_logic, filters), do: build_filter_clause(nil, parent_logic, filters) - defp build_filter_clause(clause, parent_logic, filters) do - Enum.reduce(filters, clause, fn - %Arke.Core.Query.Filter{logic: logic, base_filters: nested_filters}, clause -> - nested_clause = build_filter_clause(nil, logic, nested_filters) - add_condition_to_clause(nested_clause, clause, parent_logic) - - %Arke.Core.Query.BaseFilter{ - parameter: parameter, - operator: operator, - value: value, - negate: negate, - path: [] - }, - clause -> - parameter_condition(clause, parameter, value, operator, negate, parent_logic) - |> add_condition_to_clause(clause, parent_logic) - - %Arke.Core.Query.BaseFilter{ - parameter: parameter, - operator: operator, - value: value, - negate: negate, - path: [path_parameter | _] - }, - clause - when not is_nil(path_parameter) -> - parameter_condition(clause, parameter, value, operator, negate, parent_logic, true) - |> add_nested_condition_to_clause(clause, parent_logic) - - _, clause -> - clause - end) - end + defp build_filter_clause(clause, parent_logic, filters), + do: Enum.reduce(filters, clause, &handle_clause(&1, &2, parent_logic)) + + defp handle_clause( + %Arke.Core.Query.Filter{logic: logic, base_filters: nested_filters}, + clause, + parent_logic + ), + do: + build_filter_clause(nil, logic, nested_filters) + |> add_condition_to_clause(clause, parent_logic) + + defp handle_clause( + %Arke.Core.Query.BaseFilter{ + parameter: parameter, + operator: operator, + value: value, + negate: negate, + path: [] + }, + clause, + parent_logic + ), + do: + parameter_condition(clause, parameter, value, operator, negate, parent_logic) + |> add_condition_to_clause(clause, parent_logic) + + defp handle_clause( + %Arke.Core.Query.BaseFilter{ + parameter: parameter, + operator: operator, + value: value, + negate: negate, + path: [path_parameter | _] + }, + clause, + parent_logic + ) + when not is_nil(path_parameter), + do: + parameter_condition(clause, parameter, value, operator, negate, parent_logic, true) + |> add_nested_condition_to_clause(clause, parent_logic) + + defp handle_clause(_, clause, _), do: clause defp parameter_condition(clause, parameter, value, operator, negate, logic, joined \\ nil) do column = get_column(parameter, joined) From 350490f882177f21e576ab54c12637464e4e264b Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Wed, 23 Apr 2025 09:57:31 +0200 Subject: [PATCH 11/17] fix: use left join for nested filter --- lib/arke_postgres/query.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/arke_postgres/query.ex b/lib/arke_postgres/query.ex index ddff28e..60eea22 100644 --- a/lib/arke_postgres/query.ex +++ b/lib/arke_postgres/query.ex @@ -321,7 +321,7 @@ defmodule ArkePostgres.Query do if is_nil(acc), do: condition, else: dynamic([q, j], ^acc or ^condition) end) - from(q in query, join: j in "arke_unit", on: ^conditions) + from(q in query, left_join: j in "arke_unit", on: ^conditions) end defp extract_paths(items) do From a31e337762dccf6410ef1b12df628c0a8f648559 Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Mon, 28 Apr 2025 13:38:22 +0200 Subject: [PATCH 12/17] fix: is_nil filter --- lib/arke_postgres/query.ex | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/lib/arke_postgres/query.ex b/lib/arke_postgres/query.ex index 60eea22..8523ece 100644 --- a/lib/arke_postgres/query.ex +++ b/lib/arke_postgres/query.ex @@ -401,7 +401,7 @@ defmodule ArkePostgres.Query do condition = if is_nil(value) or operator == :isnull do - get_nil_query(parameter, column) + get_nil_query(parameter, column, negate, joined) else filter_query_by_operator(parameter, column, value, operator) end @@ -682,11 +682,25 @@ defmodule ArkePostgres.Query do end end - defp get_nil_query(%{id: id} = _parameter, column), + defp get_nil_query(%{id: id} = _parameter, column, false, false), do: dynamic( [q], - fragment("? IS NULL AND (data \\? ?)", ^column, ^Atom.to_string(id)) + fragment("? IS NULL AND (?.data \\? ?)", ^column, q, ^Atom.to_string(id)) + ) + + defp get_nil_query(%{id: id} = _parameter, column, false, true), + do: + dynamic( + [q, ..., j], + fragment("? IS NULL AND (?.data \\? ?)", ^column, j, ^Atom.to_string(id)) + ) + + defp get_nil_query(%{id: id} = _parameter, column, true, _joined), + do: + dynamic( + [q], + is_nil(^column) ) defp filter_query_by_operator(%{data: %{multiple: true}}, column, value, :eq), From 396ccec0b8b04f75e81fc6a1986779787166d97a Mon Sep 17 00:00:00 2001 From: Ilyich Vismara <81801314+ilyichv@users.noreply.github.com> Date: Mon, 23 Jun 2025 10:20:53 +0200 Subject: [PATCH 13/17] fix: bulk chunking (#46) --- lib/arke_postgres/arke_unit.ex | 82 +++++++++++++++++++++++----------- 1 file changed, 55 insertions(+), 27 deletions(-) diff --git a/lib/arke_postgres/arke_unit.ex b/lib/arke_postgres/arke_unit.ex index 61b678e..3fca716 100644 --- a/lib/arke_postgres/arke_unit.ex +++ b/lib/arke_postgres/arke_unit.ex @@ -18,6 +18,7 @@ defmodule ArkePostgres.ArkeUnit do alias Arke.Utils.ErrorGenerator, as: Error @record_fields [:id, :data, :metadata, :inserted_at, :updated_at] + @chunk_size 5000 def insert(project, arke, unit_list, opts \\ []) do now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second) @@ -44,19 +45,26 @@ defmodule ArkePostgres.ArkeUnit do ]) end) - case( - ArkePostgres.Repo.insert_all( - ArkePostgres.Tables.ArkeUnit, - records, - prefix: project, - returning: true - ) - ) do - {0, _} -> + {total_count, all_inserted} = + Stream.chunk_every(records, @chunk_size) + |> Enum.reduce({0, []}, fn chunk, {count_acc, inserted_acc} -> + case ArkePostgres.Repo.insert_all( + ArkePostgres.Tables.ArkeUnit, + chunk, + prefix: project, + returning: true + ) do + {chunk_count, chunk_inserted} -> + {count_acc + chunk_count, inserted_acc ++ chunk_inserted} + end + end) + + case total_count do + 0 -> {:error, Error.create(:insert, "no records inserted")} - {count, inserted} -> - inserted_ids = Enum.map(inserted, & &1.id) + count -> + inserted_ids = Enum.map(all_inserted, & &1.id) {valid, errors} = Enum.split_with(updated_unit_list, fn unit -> @@ -89,19 +97,28 @@ defmodule ArkePostgres.ArkeUnit do } end) - case ArkePostgres.Repo.insert_all( - ArkePostgres.Tables.ArkeUnit, - records, - prefix: project, - on_conflict: {:replace_all_except, [:id]}, - conflict_target: :id, - returning: true - ) do - {0, _} -> + {total_count, all_updated} = + Stream.chunk_every(records, @chunk_size) + |> Enum.reduce({0, []}, fn chunk, {count_acc, updated_acc} -> + case ArkePostgres.Repo.insert_all( + ArkePostgres.Tables.ArkeUnit, + chunk, + prefix: project, + on_conflict: {:replace_all_except, [:id]}, + conflict_target: :id, + returning: true + ) do + {chunk_count, chunk_updated} -> + {count_acc + chunk_count, updated_acc ++ chunk_updated} + end + end) + + case total_count do + 0 -> {:error, Error.create(:update, "no records updated")} - {count, updated} -> - updated_ids = Enum.map(updated, & &1.id) + count -> + updated_ids = Enum.map(all_updated, & &1.id) {valid, errors} = Enum.split_with(unit_list, fn unit -> @@ -122,12 +139,23 @@ defmodule ArkePostgres.ArkeUnit do where: a.id in ^Enum.map(unit_list, &Atom.to_string(&1.id)) ) - case ArkePostgres.Repo.delete_all(query, prefix: project) do - {0, nil} -> - Error.create(:delete, "item not found") + {total_count, _} = + Stream.chunk_every(Enum.map(unit_list, &Atom.to_string(&1.id)), @chunk_size) + |> Enum.reduce({0, nil}, fn chunk, {count_acc, _} -> + query = + from(a in "arke_unit", + where: a.arke_id == ^Atom.to_string(arke.id), + where: a.id in ^chunk + ) + + case ArkePostgres.Repo.delete_all(query, prefix: project) do + {chunk_count, nil} -> {count_acc + chunk_count, nil} + end + end) - _ -> - {:ok, nil} + case total_count do + 0 -> Error.create(:delete, "item not found") + _ -> {:ok, nil} end end From f06d23d310d4e07fab94851473d69b2edeee61a0 Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Mon, 23 Jun 2025 10:36:53 +0200 Subject: [PATCH 14/17] chore: update action --- .github/workflows/publish_hex.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/publish_hex.yml b/.github/workflows/publish_hex.yml index d7e4d59..c7fcc19 100644 --- a/.github/workflows/publish_hex.yml +++ b/.github/workflows/publish_hex.yml @@ -3,6 +3,7 @@ on: push: tags: - "v[0-9]+.[0-9]+.[0-9]+" + - "v[0-9]+.[0-9]+.[0-9]*" jobs: publish: @@ -14,8 +15,8 @@ jobs: - name: Set up Elixir uses: erlef/setup-beam@v1 with: - elixir-version: '1.13.4' - otp-version: '24.3' + elixir-version: "1.13.4" + otp-version: "24.3" - name: Restore dependencies cache uses: actions/cache@v3.3.1 with: @@ -31,4 +32,4 @@ jobs: - name: Release & Publish run: mix hex.publish --yes env: - HEX_API_KEY: ${{ secrets.HEX_API_KEY }} \ No newline at end of file + HEX_API_KEY: ${{ secrets.HEX_API_KEY }} From 04dadccade0a92a576b9589822cae2ff4393a45b Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Mon, 23 Jun 2025 10:47:25 +0200 Subject: [PATCH 15/17] chore: update actios/checkout to v4 --- .github/workflows/publish_hex.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish_hex.yml b/.github/workflows/publish_hex.yml index c7fcc19..0c75f61 100644 --- a/.github/workflows/publish_hex.yml +++ b/.github/workflows/publish_hex.yml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Elixir uses: erlef/setup-beam@v1 From ef3bef6569309b488457e682a93e3e6c3bf7b70f Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Mon, 23 Jun 2025 10:49:02 +0200 Subject: [PATCH 16/17] chore: update actios/cache to v4 --- .github/workflows/publish_hex.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish_hex.yml b/.github/workflows/publish_hex.yml index 0c75f61..846664f 100644 --- a/.github/workflows/publish_hex.yml +++ b/.github/workflows/publish_hex.yml @@ -18,7 +18,7 @@ jobs: elixir-version: "1.13.4" otp-version: "24.3" - name: Restore dependencies cache - uses: actions/cache@v3.3.1 + uses: actions/cache@v4 with: path: deps key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }} From cb559a346aad82a4f6d3286f654162ad72f13d23 Mon Sep 17 00:00:00 2001 From: Ilyich Vismara Date: Mon, 23 Jun 2025 10:54:48 +0200 Subject: [PATCH 17/17] fix: version --- .github/workflows/publish_hex.yml | 1 - mix.exs | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/publish_hex.yml b/.github/workflows/publish_hex.yml index 846664f..d156c2c 100644 --- a/.github/workflows/publish_hex.yml +++ b/.github/workflows/publish_hex.yml @@ -2,7 +2,6 @@ name: Publish package to Hex 📦 on: push: tags: - - "v[0-9]+.[0-9]+.[0-9]+" - "v[0-9]+.[0-9]+.[0-9]*" jobs: diff --git a/mix.exs b/mix.exs index c3c524c..c7dcca3 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule ArkePostgres.MixProject do use Mix.Project - @version "0.3.7" + @version "0.4.0-bulk.0" @scm_url "https://github.com/arkemishub/arke-postgres" @site_url "https://arkehub.com" @@ -31,6 +31,7 @@ defmodule ArkePostgres.MixProject do mod: {ArkePostgres.Application, []} ] end + defp versioning do [ tag_prefix: "v",