diff --git a/.github/workflows/publish_hex.yml b/.github/workflows/publish_hex.yml index d7e4d59..d156c2c 100644 --- a/.github/workflows/publish_hex.yml +++ b/.github/workflows/publish_hex.yml @@ -2,22 +2,22 @@ name: Publish package to Hex 📦 on: push: tags: - - "v[0-9]+.[0-9]+.[0-9]+" + - "v[0-9]+.[0-9]+.[0-9]*" jobs: publish: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Elixir uses: erlef/setup-beam@v1 with: - elixir-version: '1.13.4' - otp-version: '24.3' + elixir-version: "1.13.4" + otp-version: "24.3" - name: Restore dependencies cache - uses: actions/cache@v3.3.1 + uses: actions/cache@v4 with: path: deps key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }} @@ -31,4 +31,4 @@ jobs: - name: Release & Publish run: mix hex.publish --yes env: - HEX_API_KEY: ${{ secrets.HEX_API_KEY }} \ No newline at end of file + HEX_API_KEY: ${{ secrets.HEX_API_KEY }} diff --git a/lib/arke_postgres.ex b/lib/arke_postgres.ex index 4b1f53d..a67282d 100644 --- a/lib/arke_postgres.ex +++ b/lib/arke_postgres.ex @@ -14,14 +14,16 @@ defmodule ArkePostgres do alias Arke.Boundary.{GroupManager, ArkeManager} - alias ArkePostgres.{Table, ArkeUnit, Query} + alias ArkePostgres.{Table, ArkeUnit, ArkeLink, Query} def init() do case check_env() do {:ok, nil} -> try do + projects = + Query.get_project_record() + |> Enum.sort_by(&(to_string(&1.id) == "arke_system"), :desc) - projects =Query.get_project_record() |> Enum.sort_by(&(to_string(&1.id) == "arke_system"),:desc) Enum.each(projects, fn %{id: project_id} = _project -> start_managers(project_id) end) @@ -29,14 +31,25 @@ defmodule ArkePostgres do :ok rescue err in DBConnection.ConnectionError -> - %{message: message,reason: reason} = err - parsed_message = %{context: "db_connection_error", message: "error: #{err}, msg: #{message}"} - IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ]) + %{message: message, reason: reason} = err + + parsed_message = %{ + context: "db_connection_error", + message: "error: #{err}, msg: #{message}" + } + + IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan]) :error + err in Postgrex.Error -> - %{message: message,postgres: %{code: code, message: postgres_message}} = err - parsed_message = %{context: "postgrex_error", message: "#{message || postgres_message}"} - IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ]) + %{message: message, postgres: %{code: code, message: postgres_message}} = err + + parsed_message = %{ + context: "postgrex_error", + message: "#{message || postgres_message}" + } + + IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan]) :error end @@ -69,64 +82,99 @@ defmodule ArkePostgres do end end - defp start_managers(project_id) when is_binary(project_id), do: start_managers(String.to_atom(project_id)) + defp start_managers(project_id) when is_binary(project_id), + do: start_managers(String.to_atom(project_id)) + defp start_managers(project_id) do {parameters, arke_list, groups} = Query.get_manager_units(project_id) - Arke.handle_manager(parameters,project_id,:parameter) - Arke.handle_manager(arke_list,project_id,:arke) - Arke.handle_manager(groups,project_id,:group) - + Arke.handle_manager(parameters, project_id, :parameter) + Arke.handle_manager(arke_list, project_id, :arke) + Arke.handle_manager(groups, project_id, :group) end - def create(project, %{arke_id: arke_id} = unit) do + def create(project, unit, opts \\ []) + + def create(project, %{arke_id: arke_id} = unit, opts), + do: create(project, [unit], opts) + + def create(_project, [], _opts), do: {:ok, 0, [], []} + + def create(project, [%{arke_id: arke_id} | _] = unit_list, opts) do arke = Arke.Boundary.ArkeManager.get(arke_id, project) - case handle_create(project, arke, unit) do + + case handle_create(project, arke, unit_list, opts) do {:ok, unit} -> {:ok, Arke.Core.Unit.update(unit, metadata: Map.merge(unit.metadata, %{project: project}))} + {:ok, count, valid, errors} -> + {:ok, count, + Enum.map(valid, fn unit -> + Arke.Core.Unit.update(unit, metadata: Map.merge(unit.metadata, %{project: project})) + end), errors} + {:error, errors} -> - {:error, handle_changeset_errros(errors)} + {:error, errors} end end + defp handle_create( + project, + %{id: :arke_link} = arke, + unit_list, + opts + ), + do: ArkeLink.insert(project, arke, unit_list, opts) + defp handle_create( project, %{data: %{type: "table"}} = arke, - %{data: data, metadata: metadata} = unit + [%{data: data, metadata: metadata} = unit | _] = _, + _opts ) do + # todo: handle bulk? # todo: remove once the project is not needed anymore + data = data |> Map.merge(%{metadata: Map.delete(metadata, :project)}) |> data_as_klist Table.insert(project, arke, data) {:ok, unit} end - defp handle_create(project, %{data: %{type: "arke"}} = arke, unit) do - case ArkeUnit.insert(project, arke, unit) do - {:ok, %{id: id, inserted_at: inserted_at, updated_at: updated_at}} -> - {:ok, - Arke.Core.Unit.update(unit, id: id, inserted_at: inserted_at, updated_at: updated_at)} - - {:error, errors} -> - {:error, errors} - end - end + defp handle_create(project, %{data: %{type: "arke"}} = arke, unit_list, opts), + do: ArkeUnit.insert(project, arke, unit_list, opts) - defp handle_create(proj, arke, unit) do + defp handle_create(_project, _arke, _unit, _opts) do {:error, "arke type not supported"} end - def update(project, %{arke_id: arke_id} = unit) do + def update(project, unit, opts \\ []) + + def update(project, %{arke_id: arke_id} = unit, opts), + do: update(project, [unit], opts) + + def update(_project, [], _opts), do: {:ok, 0, [], []} + + def update(project, [%{arke_id: arke_id} | _] = unit_list, opts) do arke = Arke.Boundary.ArkeManager.get(arke_id, project) - {:ok, unit} = handle_update(project, arke, unit) + handle_update(project, arke, unit_list, opts) end + def handle_update( + project, + %{id: :arke_link} = arke, + unit_list, + opts + ), + do: ArkeLink.update(project, arke, unit_list, opts) + def handle_update( project, %{data: %{type: "table"}} = arke, - %{data: data, metadata: metadata} = unit + [%{data: data, metadata: metadata} = unit | _] = _, + _opts ) do + # todo: handle bulk? data = unit |> filter_primary_keys(false) @@ -140,21 +188,37 @@ defmodule ArkePostgres do {:ok, unit} end - def handle_update(project, %{data: %{type: "arke"}} = arke, unit) do - ArkeUnit.update(project, arke, unit) - {:ok, unit} - end + def handle_update(project, %{data: %{type: "arke"}} = arke, unit_list, opts), + do: ArkeUnit.update(project, arke, unit_list, opts) - def handle_update(_, _, _) do + def handle_update(_project, _arke, _unit, _opts) do {:error, "arke type not supported"} end - def delete(project, %{arke_id: arke_id} = unit) do + def delete(project, unit, opts \\ []) + + def delete(project, %{arke_id: arke_id} = unit, opts), do: delete(project, [unit], opts) + + def delete(project, [], opts), do: {:ok, nil} + + def delete(project, [%{arke_id: arke_id} | _] = unit_list, opts) do arke = Arke.Boundary.ArkeManager.get(arke_id, project) - handle_delete(project, arke, unit) + handle_delete(project, arke, unit_list) end - defp handle_delete(project, %{data: %{type: "table"}} = arke, %{metadata: metadata} = unit) do + defp handle_delete( + project, + %{id: :arke_link} = arke, + unit_list + ), + do: ArkeLink.delete(project, arke, unit_list) + + defp handle_delete( + project, + %{data: %{type: "table"}} = arke, + [%{metadata: metadata} = unit | _] = _ + ) do + # todo: handle bulk? metadata = Map.delete(metadata, :project) where = unit |> filter_primary_keys(true) |> Map.put_new(:metadata, metadata) |> data_as_klist @@ -165,11 +229,8 @@ defmodule ArkePostgres do end end - defp handle_delete(project, %{data: %{type: "arke"}} = arke, unit) do - case ArkeUnit.delete(project, arke, unit) do - {:ok, _} -> {:ok, nil} - {:error, msg} -> {:error, msg} - end + defp handle_delete(project, %{data: %{type: "arke"}} = arke, unit_list) do + ArkeUnit.delete(project, arke, unit_list) end defp handle_delete(_, _, _) do @@ -204,13 +265,6 @@ defmodule ArkePostgres do Enum.to_list(data) end - defp handle_changeset_errros(errors)when is_binary(errors), do: errors - defp handle_changeset_errros(errors) do - Enum.map(errors, fn {field, detail} -> - "#{field}: #{render_detail(detail)}" - end) - end - defp render_detail({message, values}) do Enum.reduce(values, message, fn {k, v}, acc -> String.replace(acc, "%{#{k}}", to_string(v)) @@ -234,15 +288,17 @@ defmodule ArkePostgres do IO.inspect("DBConnection.ConnectionError") %{message: message} = err parsed_message = %{context: "db_connection_error", message: "#{message}"} - IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ]) + IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan]) :error + err in Postgrex.Error -> IO.inspect("Postgrex.Error") - %{message: message,postgres: %{code: code, message: postgres_message}} = err + %{message: message, postgres: %{code: code, message: postgres_message}} = err parsed_message = %{context: "postgrex_error", message: "#{message || postgres_message}"} - IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ]) + IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan]) :error - err -> + + err -> IO.inspect("uncatched error") IO.inspect(err) :error diff --git a/lib/arke_postgres/arke_link.ex b/lib/arke_postgres/arke_link.ex new file mode 100644 index 0000000..4782c1b --- /dev/null +++ b/lib/arke_postgres/arke_link.ex @@ -0,0 +1,117 @@ +# Copyright 2023 Arkemis S.r.l. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule ArkePostgres.ArkeLink do + import Ecto.Query, only: [from: 2, dynamic: 2, dynamic: 1] + alias Arke.Utils.ErrorGenerator, as: Error + + def get_all(project, schema, fields, where \\ []) do + query = from(ArkePostgres.Tables.ArkeLink, select: ^fields, where: ^where) + ArkePostgres.Repo.all(query, prefix: project) + end + + def get_by(project, schema, fields, where) do + query = from(ArkePostgres.Tables.ArkeLink, select: ^fields, where: ^where) + ArkePostgres.Repo.one(query, prefix: project) + end + + def insert(project, schema, data, opts \\ []) do + records = + Enum.map(data, fn unit -> + %{ + parent_id: Map.get(unit.data, :parent_id), + child_id: Map.get(unit.data, :child_id), + type: Map.get(unit.data, :type), + metadata: Map.get(unit, :metadata) + } + end) + + case ArkePostgres.Repo.insert_all(ArkePostgres.Tables.ArkeLink, records, + prefix: project, + returning: [:child_id, :parent_id, :type] + ) do + {0, _} -> + {:error, Error.create(:insert, "no records inserted")} + + {count, inserted} -> + inserted_keys = + Enum.map(inserted, fn link -> {link.type, link.parent_id, link.child_id} end) + + {valid, errors} = + Enum.split_with(data, fn unit -> + {unit.data[:type], unit.data[:parent_id], unit.data[:child_id]} in inserted_keys + end) + + case opts[:bulk] do + true -> {:ok, count, valid, errors} + _ -> {:ok, List.first(valid)} + end + end + end + + def update(project, schema, data, opts \\ []) do + records = + Enum.map(data, fn unit -> + %{ + parent_id: Map.get(unit.data, :parent_id), + child_id: Map.get(unit.data, :child_id), + type: Map.get(unit.data, :type), + metadata: Map.delete(Map.get(unit, :metadata), :project) + } + end) + + case ArkePostgres.Repo.insert_all(ArkePostgres.Tables.ArkeLink, records, + prefix: project, + returning: [:child_id, :parent_id, :type], + on_conflict: {:replace_all_except, [:type, :parent_id, :child_id, :id]}, + conflict_target: [:type, :parent_id, :child_id] + ) do + {0, _} -> + {:error, Error.create(:insert, "no records inserted")} + + {count, updated} -> + updated_keys = + Enum.map(updated, fn link -> {link.type, link.parent_id, link.child_id} end) + + {valid, errors} = + Enum.split_with(data, fn unit -> + {unit.data[:type], unit.data[:parent_id], unit.data[:child_id]} in updated_keys + end) + + case opts[:bulk] do + true -> {:ok, count, valid, errors} + _ -> {:ok, List.first(valid)} + end + end + end + + def delete(project, schema, unit_list) do + where = + Enum.reduce(unit_list, dynamic(false), fn unit, dynamic -> + dynamic( + [a], + ^dynamic or + (a.parent_id == ^unit.data.parent_id and a.child_id == ^unit.data.child_id and + a.type == ^unit.data.type) + ) + end) + + query = from(ArkePostgres.Tables.ArkeLink, where: ^where) + + case ArkePostgres.Repo.delete_all(query, prefix: project) do + {0, nil} -> Error.create(:delete, "item not found") + _ -> {:ok, nil} + end + end +end diff --git a/lib/arke_postgres/arke_unit.ex b/lib/arke_postgres/arke_unit.ex index 1506055..3fca716 100644 --- a/lib/arke_postgres/arke_unit.ex +++ b/lib/arke_postgres/arke_unit.ex @@ -18,25 +18,63 @@ defmodule ArkePostgres.ArkeUnit do alias Arke.Utils.ErrorGenerator, as: Error @record_fields [:id, :data, :metadata, :inserted_at, :updated_at] - - def insert(project, arke, %{data: data} = unit) do - row = [ - id: handle_id(unit.id), - arke_id: Atom.to_string(unit.arke_id), - data: encode_unit_data(arke, data), - metadata: unit.metadata, - inserted_at: unit.inserted_at, - updated_at: unit.updated_at - ] - - case ArkePostgres.Repo.insert(ArkePostgres.Tables.ArkeUnit.changeset(Enum.into(row, %{})), - prefix: project - ) do - {:ok, record} -> - {:ok, record} - - {:error, changeset} -> - {:error, changeset.errors} + @chunk_size 5000 + + def insert(project, arke, unit_list, opts \\ []) do + now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second) + + %{unit_list: updated_unit_list, records: records} = + Enum.reduce(unit_list, %{unit_list: [], records: []}, fn unit, acc -> + id = handle_id(unit.id) + + updated_unit = + unit |> Map.put(:id, id) |> Map.put(:inserted_at, now) |> Map.put(:updated_at, now) + + acc + |> Map.put(:unit_list, [updated_unit | acc.unit_list]) + |> Map.put(:records, [ + %{ + id: id, + arke_id: Atom.to_string(unit.arke_id), + data: encode_unit_data(arke, unit.data), + metadata: unit.metadata, + inserted_at: now, + updated_at: now + } + | acc.records + ]) + end) + + {total_count, all_inserted} = + Stream.chunk_every(records, @chunk_size) + |> Enum.reduce({0, []}, fn chunk, {count_acc, inserted_acc} -> + case ArkePostgres.Repo.insert_all( + ArkePostgres.Tables.ArkeUnit, + chunk, + prefix: project, + returning: true + ) do + {chunk_count, chunk_inserted} -> + {count_acc + chunk_count, inserted_acc ++ chunk_inserted} + end + end) + + case total_count do + 0 -> + {:error, Error.create(:insert, "no records inserted")} + + count -> + inserted_ids = Enum.map(all_inserted, & &1.id) + + {valid, errors} = + Enum.split_with(updated_unit_list, fn unit -> + unit.id in inserted_ids + end) + + case opts[:bulk] do + true -> {:ok, count, valid, errors} + _ -> {:ok, List.first(valid)} + end end end @@ -46,30 +84,78 @@ defmodule ArkePostgres.ArkeUnit do # TODO handle error defp handle_id(id), do: id - def update(project, arke, %{data: data} = unit, where \\ []) do - where = Keyword.put_new(where, :arke_id, Atom.to_string(unit.arke_id)) - where = Keyword.put_new(where, :id, Atom.to_string(unit.id)) - - row = [ - data: encode_unit_data(arke, data), - metadata: Map.delete(unit.metadata, :project), - updated_at: unit.updated_at - ] - - query = from("arke_unit", where: ^where, update: [set: ^row]) - ArkePostgres.Repo.update_all(query, [], prefix: project) + def update(project, arke, unit_list, opts) do + records = + Enum.map(unit_list, fn unit -> + %{ + id: to_string(unit.id), + arke_id: to_string(arke.id), + data: encode_unit_data(arke, unit.data), + metadata: Map.delete(unit.metadata, :project), + inserted_at: DateTime.to_naive(unit.inserted_at) |> NaiveDateTime.truncate(:second), + updated_at: DateTime.to_naive(unit.updated_at) + } + end) + + {total_count, all_updated} = + Stream.chunk_every(records, @chunk_size) + |> Enum.reduce({0, []}, fn chunk, {count_acc, updated_acc} -> + case ArkePostgres.Repo.insert_all( + ArkePostgres.Tables.ArkeUnit, + chunk, + prefix: project, + on_conflict: {:replace_all_except, [:id]}, + conflict_target: :id, + returning: true + ) do + {chunk_count, chunk_updated} -> + {count_acc + chunk_count, updated_acc ++ chunk_updated} + end + end) + + case total_count do + 0 -> + {:error, Error.create(:update, "no records updated")} + + count -> + updated_ids = Enum.map(all_updated, & &1.id) + + {valid, errors} = + Enum.split_with(unit_list, fn unit -> + to_string(unit.id) in updated_ids + end) + + case opts[:bulk] do + true -> {:ok, count, valid, errors} + _ -> {:ok, List.first(valid)} + end + end end - def delete(project, arke, unit) do - where = [arke_id: Atom.to_string(arke.id), id: Atom.to_string(unit.id)] - query = from(a in "arke_unit", where: ^where) - - case ArkePostgres.Repo.delete_all(query, prefix: project) do - {0, nil} -> - Error.create(:delete, "item not found") - - _ -> - {:ok, nil} + def delete(project, arke, unit_list) do + query = + from(a in "arke_unit", + where: a.arke_id == ^Atom.to_string(arke.id), + where: a.id in ^Enum.map(unit_list, &Atom.to_string(&1.id)) + ) + + {total_count, _} = + Stream.chunk_every(Enum.map(unit_list, &Atom.to_string(&1.id)), @chunk_size) + |> Enum.reduce({0, nil}, fn chunk, {count_acc, _} -> + query = + from(a in "arke_unit", + where: a.arke_id == ^Atom.to_string(arke.id), + where: a.id in ^chunk + ) + + case ArkePostgres.Repo.delete_all(query, prefix: project) do + {chunk_count, nil} -> {count_acc + chunk_count, nil} + end + end) + + case total_count do + 0 -> Error.create(:delete, "item not found") + _ -> {:ok, nil} end end diff --git a/lib/arke_postgres/query.ex b/lib/arke_postgres/query.ex index 4d867f9..8523ece 100644 --- a/lib/arke_postgres/query.ex +++ b/lib/arke_postgres/query.ex @@ -14,6 +14,7 @@ defmodule ArkePostgres.Query do import Ecto.Query + require IEx alias Arke.Utils.DatetimeHandler, as: DatetimeHandler @record_fields [:id, :arke_id, :data, :metadata, :inserted_at, :updated_at] @@ -22,7 +23,10 @@ defmodule ArkePostgres.Query do %{filters: filters, orders: orders, offset: offset, limit: limit} = arke_query, action ) do + paths = extract_paths(filters) ++ extract_paths(orders) + base_query(arke_query, action) + |> handle_paths_join(paths) |> handle_filters(filters) |> handle_orders(orders) |> handle_offset(offset) @@ -49,13 +53,16 @@ defmodule ArkePostgres.Query do def execute(query, :pseudo_query), do: generate_query(query, :pseudo_query) - def get_column(%{data: %{persistence: "arke_parameter"}} = parameter), - do: get_arke_column(parameter) + def get_column(column), do: get_column(column, false) + + def get_column(%{data: %{persistence: "arke_parameter"}} = parameter, joined), + do: get_arke_column(parameter, joined) - def get_column(%{data: %{persistence: "table_column"}} = parameter), + def get_column(%{data: %{persistence: "table_column"}} = parameter, _joined), do: get_table_column(parameter) def remove_arke_system(metadata, project_id) when project_id == :arke_system, do: metadata + def remove_arke_system(metadata, project_id) do case Map.get(metadata, "project") do "arke_system" -> Map.delete(metadata, "project") @@ -72,7 +79,10 @@ defmodule ArkePostgres.Query do end def get_manager_units(project_id) do - arke_link =%{id: :arke_link, data: %{parameters: [%{id: :type},%{id: :child_id},%{id: :parent_id},%{id: :metadata}]}} + arke_link = %{ + id: :arke_link, + data: %{parameters: [%{id: :type}, %{id: :child_id}, %{id: :parent_id}, %{id: :metadata}]} + } links = from(q in table_query(arke_link, nil), where: q.type in ["parameter", "group"]) @@ -81,7 +91,6 @@ defmodule ArkePostgres.Query do parameter_links = Enum.filter(links, fn x -> x.type == "parameter" end) group_links = Enum.filter(links, fn x -> x.type == "group" end) - parameters_id = Arke.Utils.DefaultData.get_parameters_id() list_arke_id = Arke.Utils.DefaultData.get_arke_id() @@ -93,13 +102,15 @@ defmodule ArkePostgres.Query do units_map = Map.new(unit_list, &{&1.id, &1.metadata}) parameter_links = merge_unit_metadata(parameter_links, units_map, project_id) - parameters = parse_parameters(Enum.filter(unit_list, fn u -> u.arke_id in parameters_id end), project_id) + parameters = + parse_parameters(Enum.filter(unit_list, fn u -> u.arke_id in parameters_id end), project_id) arke_list = parse_arke_list( Enum.filter(unit_list, fn u -> u.arke_id == "arke" end), parameter_links ) + groups = parse_groups( Enum.filter(unit_list, fn u -> u.arke_id == "group" end), @@ -123,27 +134,45 @@ defmodule ArkePostgres.Query do Enum.filter(parameter_links, fn x -> x.parent_id == id end), [], fn p, new_params -> - [%{id: String.to_atom(p.child_id), metadata: Enum.reduce(p.metadata,%{}, fn {key, val}, acc -> Map.put(acc, String.to_atom(key), val) end)} | new_params] + [ + %{ + id: String.to_atom(p.child_id), + metadata: + Enum.reduce(p.metadata, %{}, fn {key, val}, acc -> + Map.put(acc, String.to_atom(key), val) + end) + } + | new_params + ] end ) - updated_data = Enum.reduce(unit.data,%{}, fn {k,db_data},acc -> Map.put(acc,String.to_atom(k),db_data["value"]) end) + + updated_data = + Enum.reduce(unit.data, %{}, fn {k, db_data}, acc -> + Map.put(acc, String.to_atom(k), db_data["value"]) + end) |> Map.put(:id, id) |> Map.put(:metadata, metadata) - |> Map.update(:parameters,[], fn current -> params ++ current end) + |> Map.update(:parameters, [], fn current -> params ++ current end) - [ updated_data | new_arke_list] + [updated_data | new_arke_list] end) end - defp parse_parameters(parameter_list, project_id)do - Enum.reduce(parameter_list, [], fn %{id: id, arke_id: arke_id, metadata: metadata} = unit, new_parameter_list -> + defp parse_parameters(parameter_list, project_id) do + Enum.reduce(parameter_list, [], fn %{id: id, arke_id: arke_id, metadata: metadata} = unit, + new_parameter_list -> parsed_metadata = remove_arke_system(metadata, project_id) - updated_data = Enum.reduce(unit.data,%{}, fn {k,db_data},acc -> Map.put(acc,String.to_atom(k),db_data["value"]) end) - |> Map.put(:id, id) - |> Map.put(:type, arke_id) - |> Map.put(:metadata, parsed_metadata) - [ updated_data | new_parameter_list] + updated_data = + Enum.reduce(unit.data, %{}, fn {k, db_data}, acc -> + Map.put(acc, String.to_atom(k), db_data["value"]) + end) + |> Map.put(:id, id) + |> Map.put(:type, arke_id) + |> Map.put(:metadata, parsed_metadata) + + [updated_data | new_parameter_list] end) end @@ -157,23 +186,31 @@ defmodule ArkePostgres.Query do [%{id: String.to_atom(p.child_id), metadata: p.metadata} | new_params] end ) - updated_data = Enum.reduce(unit.data,%{}, fn {k,db_data},acc -> Map.put(acc,String.to_atom(k),db_data["value"]) end) - |> Map.put(:id, id) - |> Map.put(:metadata, metadata) - |> Map.update(:arke_list,[], fn db_arke_list -> - Enum.reduce(db_arke_list,[], fn key,acc -> - case Enum.find(arke_list, fn %{id: id, metadata: _metadata} -> to_string(id) == key end) do - nil -> [key|acc] - data -> - [data|acc] - end - end) - end) - [ updated_data | new_groups] + + updated_data = + Enum.reduce(unit.data, %{}, fn {k, db_data}, acc -> + Map.put(acc, String.to_atom(k), db_data["value"]) + end) + |> Map.put(:id, id) + |> Map.put(:metadata, metadata) + |> Map.update(:arke_list, [], fn db_arke_list -> + Enum.reduce(db_arke_list, [], fn key, acc -> + case Enum.find(arke_list, fn %{id: id, metadata: _metadata} -> + to_string(id) == key + end) do + nil -> + [key | acc] + + data -> + [data | acc] + end + end) + end) + + [updated_data | new_groups] end) end - ###################################################################################################################### # PRIVATE FUNCTIONS ################################################################################################## ###################################################################################################################### @@ -182,17 +219,31 @@ defmodule ArkePostgres.Query do defp base_query(%{link: nil} = _arke_query, action), do: arke_query(action) - defp base_query(%{link: %{unit: %{id: link_id},depth: depth, direction: direction,type: type}, project: project} = _arke_query, action), - do: - get_nodes( - project, - action, - [to_string(link_id)], - depth, - direction, - type - ) - defp base_query(%{link: %{unit: unit_list,depth: depth, direction: direction,type: type}, project: project} = _arke_query, action) when is_list(unit_list) do + defp base_query( + %{ + link: %{unit: %{id: link_id}, depth: depth, direction: direction, type: type}, + project: project + } = _arke_query, + action + ), + do: + get_nodes( + project, + action, + [to_string(link_id)], + depth, + direction, + type + ) + + defp base_query( + %{ + link: %{unit: unit_list, depth: depth, direction: direction, type: type}, + project: project + } = _arke_query, + action + ) + when is_list(unit_list) do get_nodes( project, action, @@ -261,35 +312,101 @@ defmodule ArkePostgres.Query do Arke.Core.Unit.load(arke, record) end + defp handle_paths_join(query, []), do: query + + defp handle_paths_join(query, paths) do + conditions = + Enum.reduce(paths, nil, fn path, acc -> + condition = dynamic([q, j], ^get_column(List.first(path)) == j.id) + if is_nil(acc), do: condition, else: dynamic([q, j], ^acc or ^condition) + end) + + from(q in query, left_join: j in "arke_unit", on: ^conditions) + end + + defp extract_paths(items) do + items + |> Enum.flat_map(&extract_path/1) + |> Enum.reject(&(is_nil(&1) or length(&1) == 0)) + |> Enum.uniq() + end + + def extract_path(%Arke.Core.Query.Filter{base_filters: base_filters}), + do: Enum.flat_map(base_filters, &extract_path/1) + + def extract_path(%Arke.Core.Query.BaseFilter{path: path}), do: [path] + def extract_path(%Arke.Core.Query.Order{path: path}), do: [path] + + def extract_path(_), do: [] + def handle_filters(query, filters) do Enum.reduce(filters, query, fn %{logic: logic, negate: negate, base_filters: base_filters}, - new_query -> - clause = handle_condition(logic, base_filters) |> handle_negate_condition(negate) - from(q in new_query, where: ^clause) + query -> + clause = build_filter_clause(logic, base_filters) |> handle_negate_condition(negate) + from(q in query, where: ^clause) end) end - defp handle_condition(logic, base_filters) do - Enum.reduce(base_filters, nil, fn %{ - parameter: parameter, - operator: operator, - value: value, - negate: negate - }, - clause -> - column = get_column(parameter) - value = get_value(parameter, value) - + defp build_filter_clause(parent_logic, filters), + do: build_filter_clause(nil, parent_logic, filters) + + defp build_filter_clause(clause, parent_logic, filters), + do: Enum.reduce(filters, clause, &handle_clause(&1, &2, parent_logic)) + + defp handle_clause( + %Arke.Core.Query.Filter{logic: logic, base_filters: nested_filters}, + clause, + parent_logic + ), + do: + build_filter_clause(nil, logic, nested_filters) + |> add_condition_to_clause(clause, parent_logic) + + defp handle_clause( + %Arke.Core.Query.BaseFilter{ + parameter: parameter, + operator: operator, + value: value, + negate: negate, + path: [] + }, + clause, + parent_logic + ), + do: + parameter_condition(clause, parameter, value, operator, negate, parent_logic) + |> add_condition_to_clause(clause, parent_logic) + + defp handle_clause( + %Arke.Core.Query.BaseFilter{ + parameter: parameter, + operator: operator, + value: value, + negate: negate, + path: [path_parameter | _] + }, + clause, + parent_logic + ) + when not is_nil(path_parameter), + do: + parameter_condition(clause, parameter, value, operator, negate, parent_logic, true) + |> add_nested_condition_to_clause(clause, parent_logic) + + defp handle_clause(_, clause, _), do: clause + + defp parameter_condition(clause, parameter, value, operator, negate, logic, joined \\ nil) do + column = get_column(parameter, joined) + value = get_value(parameter, value) + + condition = if is_nil(value) or operator == :isnull do - condition = get_nil_query(parameter, column) |> handle_negate_condition(negate) - add_condition_to_clause(condition, clause, logic) + get_nil_query(parameter, column, negate, joined) else - condition = - filter_query_by_operator(parameter, column, value, operator) |> handle_negate_condition(negate) - - add_condition_to_clause(condition, clause, logic) + filter_query_by_operator(parameter, column, value, operator) end - end) + + handle_negate_condition(condition, negate) end defp handle_negate_condition(condition, true), do: dynamic([q], not (^condition)) @@ -299,10 +416,20 @@ defmodule ArkePostgres.Query do defp add_condition_to_clause(condition, clause, :and), do: dynamic([q], ^clause and ^condition) defp add_condition_to_clause(condition, clause, :or), do: dynamic([q], ^clause or ^condition) + defp add_nested_condition_to_clause(condition, nil, _), do: dynamic([_q, j], ^condition) + + defp add_nested_condition_to_clause(condition, clause, :and), + do: dynamic([_q, j], ^clause and ^condition) + + defp add_nested_condition_to_clause(condition, clause, :or), + do: dynamic([_q, j], ^clause or ^condition) + defp handle_orders(query, orders) do order_by = - Enum.reduce(orders, [], fn %{parameter: parameter, direction: direction}, new_order_by -> - column = get_column(parameter) + Enum.reduce(orders, [], fn %{parameter: parameter, direction: direction, path: path}, + new_order_by -> + joined = length(path) > 0 + column = get_column(parameter, joined) [{direction, column} | new_order_by] end) @@ -317,61 +444,152 @@ defmodule ArkePostgres.Query do defp get_table_column(%{id: id} = _parameter), do: dynamic([q], fragment("?", field(q, ^id))) - defp get_arke_column(%{id: id, data: %{multiple: true}} = _parameter), - do: dynamic([q], fragment("(? -> ? ->> 'value')::jsonb", field(q, :data), ^Atom.to_string(id))) + defp get_arke_column(%{id: id, data: %{multiple: true}} = _parameter, true), + do: + dynamic( + [_q, ..., j], + fragment("(? -> ? ->> 'value')::jsonb", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, data: %{multiple: true}} = _parameter, _joined), + do: + dynamic([q], fragment("(? -> ? ->> 'value')::jsonb", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :string} = _parameter), + defp get_arke_column(%{id: id, arke_id: :string} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::text", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :string} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id))) + defp get_arke_column(%{id: id, arke_id: :atom} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::text", field(j, :data), ^Atom.to_string(id)) + ) - defp get_arke_column(%{id: id, arke_id: :atom} = _parameter), + defp get_arke_column(%{id: id, arke_id: :atom} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :boolean} = _parameter), + defp get_arke_column(%{id: id, arke_id: :boolean} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::boolean", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :boolean} = _parameter, _joined), do: dynamic( [q], fragment("(? -> ? ->> 'value')::boolean", field(q, :data), ^Atom.to_string(id)) ) - defp get_arke_column(%{id: id, arke_id: :datetime} = _parameter), + defp get_arke_column(%{id: id, arke_id: :datetime} = _parameter, true), + do: + dynamic( + [_q, ..., j], + fragment("(? -> ? ->> 'value')::timestamp", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :datetime} = _parameter, _joined), do: dynamic( [q], fragment("(? -> ? ->> 'value')::timestamp", field(q, :data), ^Atom.to_string(id)) ) - defp get_arke_column(%{id: id, arke_id: :date} = _parameter), + defp get_arke_column(%{id: id, arke_id: :date} = _parameter, true), + do: + dynamic( + [_q, ..., j], + fragment("(? -> ? ->> 'value')::date", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :date} = _parameter, _joined), do: dynamic( [q], fragment("(? -> ? ->> 'value')::date", field(q, :data), ^Atom.to_string(id)) ) - defp get_arke_column(%{id: id, arke_id: :time} = _parameter), + defp get_arke_column(%{id: id, arke_id: :time} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::time", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :time} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::time", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :integer} = _parameter), + defp get_arke_column(%{id: id, arke_id: :integer} = _parameter, true), + do: + dynamic( + [_q, ..., j], + fragment("(? -> ? ->> 'value')::integer", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :integer} = _parameter, _joined), do: dynamic( [q], fragment("(? -> ? ->> 'value')::integer", field(q, :data), ^Atom.to_string(id)) ) - defp get_arke_column(%{id: id, arke_id: :float} = _parameter), + defp get_arke_column(%{id: id, arke_id: :float} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::float", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :float} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::float", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :dict} = _parameter), + defp get_arke_column(%{id: id, arke_id: :dict} = _parameter, true), + do: + dynamic( + [_q, ..., j], + fragment("(? -> ? ->> 'value')::JSON", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :dict} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::JSON", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :list} = _parameter), + defp get_arke_column(%{id: id, arke_id: :list} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::JSON", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :list} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::JSON", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :link} = _parameter), + defp get_arke_column(%{id: id, arke_id: :link} = _parameter, true), + do: + dynamic( + [_q, ..., j], + fragment("(? -> ? ->> 'value')::text", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :link} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id))) - defp get_arke_column(%{id: id, arke_id: :dynamic} = _parameter), + defp get_arke_column(%{id: id, arke_id: :dynamic} = _parameter, true), + do: + dynamic( + [_, ..., j], + fragment("(? -> ? ->> 'value')::text", field(j, :data), ^Atom.to_string(id)) + ) + + defp get_arke_column(%{id: id, arke_id: :dynamic} = _parameter, _joined), do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id))) defp get_value(_parameter, value) when is_nil(value), do: value @@ -464,15 +682,32 @@ defmodule ArkePostgres.Query do end end - defp get_nil_query(%{id: id} = _parameter, column), + defp get_nil_query(%{id: id} = _parameter, column, false, false), do: dynamic( [q], - fragment("? IS NULL AND (data \\? ?)", ^column, ^Atom.to_string(id)) + fragment("? IS NULL AND (?.data \\? ?)", ^column, q, ^Atom.to_string(id)) ) - defp filter_query_by_operator(%{data: %{multiple: true}}, column, value, :eq), do: dynamic([q], fragment("jsonb_exists(?, ?)", ^column, ^value)) - defp filter_query_by_operator(parameter, column, value, :eq), do: dynamic([q], ^column == ^value) + defp get_nil_query(%{id: id} = _parameter, column, false, true), + do: + dynamic( + [q, ..., j], + fragment("? IS NULL AND (?.data \\? ?)", ^column, j, ^Atom.to_string(id)) + ) + + defp get_nil_query(%{id: id} = _parameter, column, true, _joined), + do: + dynamic( + [q], + is_nil(^column) + ) + + defp filter_query_by_operator(%{data: %{multiple: true}}, column, value, :eq), + do: dynamic([q], fragment("jsonb_exists(?, ?)", ^column, ^value)) + + defp filter_query_by_operator(parameter, column, value, :eq), + do: dynamic([q], ^column == ^value) defp filter_query_by_operator(parameter, column, value, :contains), do: dynamic([q], like(^column, fragment("?", ^("%" <> value <> "%")))) @@ -492,11 +727,18 @@ defmodule ArkePostgres.Query do defp filter_query_by_operator(parameter, column, value, :istartswith), do: dynamic([q], ilike(^column, fragment("?", ^(value <> "%")))) - defp filter_query_by_operator(parameter, column, value, :lte), do: dynamic([q], ^column <= ^value) + defp filter_query_by_operator(parameter, column, value, :lte), + do: dynamic([q], ^column <= ^value) + defp filter_query_by_operator(parameter, column, value, :lt), do: dynamic([q], ^column < ^value) defp filter_query_by_operator(parameter, column, value, :gt), do: dynamic([q], ^column > ^value) - defp filter_query_by_operator(parameter, column, value, :gte), do: dynamic([q], ^column >= ^value) - defp filter_query_by_operator(parameter, column, value, :in), do: dynamic([q], ^column in ^value) + + defp filter_query_by_operator(parameter, column, value, :gte), + do: dynamic([q], ^column >= ^value) + + defp filter_query_by_operator(parameter, column, value, :in), + do: dynamic([q], ^column in ^value) + defp filter_query_by_operator(parameter, column, value, _), do: dynamic([q], ^column == ^value) # defp filter_query_by_operator(query, key, value, "between"), do: from q in query, where: column_table(q, ^key) == ^value @@ -532,7 +774,9 @@ defmodule ArkePostgres.Query do where_field = get_where_field_by_direction(direction) |> get_where_condition_by_type(type) get_link_query(action, project, unit_id, link_field, tree_field, depth, where_field) end - def get_nodes(project, action, unit_id, depth, direction, type), do: get_nodes(project, action, [unit_id], depth, direction, type) + + def get_nodes(project, action, unit_id, depth, direction, type), + do: get_nodes(project, action, [unit_id], depth, direction, type) defp get_project(project) when is_atom(project), do: Atom.to_string(project) defp get_project(project), do: project @@ -578,40 +822,43 @@ defmodule ArkePostgres.Query do end defp get_link_query(_action, project, unit_id_list, link_field, tree_field, depth, where_field) do - q = from(r in from(a in "arke_unit", - left_join: - cte in fragment( - @raw_cte_query, - literal(^link_field), - literal(^project), - literal(^link_field), - ^unit_id_list, - literal(^project), - literal(^project), - literal(^project), - literal(^project), - literal(^project), - literal(^project), - literal(^link_field), - literal(^tree_field), - ^depth - ), - where: ^where_field, - distinct: [a.id, cte.starting_unit], - select: %{ - id: a.id, - arke_id: a.arke_id, - data: a.data, - metadata: a.metadata, - inserted_at: a.inserted_at, - updated_at: a.updated_at, - depth: cte.depth, - link_metadata: cte.metadata, - link_type: cte.type, - starting_unit: cte.starting_unit - } - )) - from x in subquery(q), select: x - end + q = + from( + r in from(a in "arke_unit", + left_join: + cte in fragment( + @raw_cte_query, + literal(^link_field), + literal(^project), + literal(^link_field), + ^unit_id_list, + literal(^project), + literal(^project), + literal(^project), + literal(^project), + literal(^project), + literal(^project), + literal(^link_field), + literal(^tree_field), + ^depth + ), + where: ^where_field, + distinct: [a.id, cte.starting_unit], + select: %{ + id: a.id, + arke_id: a.arke_id, + data: a.data, + metadata: a.metadata, + inserted_at: a.inserted_at, + updated_at: a.updated_at, + depth: cte.depth, + link_metadata: cte.metadata, + link_type: cte.type, + starting_unit: cte.starting_unit + } + ) + ) + from(x in subquery(q), select: x) + end end diff --git a/lib/arke_postgres/tables/arke_link.ex b/lib/arke_postgres/tables/arke_link.ex index b7558c5..d48e986 100644 --- a/lib/arke_postgres/tables/arke_link.ex +++ b/lib/arke_postgres/tables/arke_link.ex @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -defmodule ArkePostgres.ArkeLink do +defmodule ArkePostgres.Tables.ArkeLink do use Ecto.Schema import Ecto.Changeset @@ -20,12 +20,14 @@ defmodule ArkePostgres.ArkeLink do @foreign_key_type :string schema "arke_link" do - field(:type, :string, default: "link") - belongs_to(:parent_id, ArkePostgres.ArkeUnit, primary_key: true) - belongs_to(:child_id, ArkePostgres.ArkeUnit, primary_key: true) + field(:type, :string, default: "link", primary_key: true) + belongs_to(:parent, ArkePostgres.ArkeUnit, primary_key: true, foreign_key: :parent_id) + belongs_to(:child, ArkePostgres.ArkeUnit, primary_key: true, foreign_key: :child_id) field(:metadata, :map, default: %{}) end + # TODO: add insert_all validation + def changeset(args \\ []) do %__MODULE__{} |> cast(args, @arke_link_fields) diff --git a/lib/arke_postgres/tables/arke_unit.ex b/lib/arke_postgres/tables/arke_unit.ex index f2823a5..b9427c2 100644 --- a/lib/arke_postgres/tables/arke_unit.ex +++ b/lib/arke_postgres/tables/arke_unit.ex @@ -27,6 +27,8 @@ defmodule ArkePostgres.Tables.ArkeUnit do timestamps() end + # TODO: add insert_all validation + def changeset(args \\ []) do %__MODULE__{} |> cast(args, @arke_record_fields) diff --git a/mix.exs b/mix.exs index c3c524c..c7dcca3 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule ArkePostgres.MixProject do use Mix.Project - @version "0.3.7" + @version "0.4.0-bulk.0" @scm_url "https://github.com/arkemishub/arke-postgres" @site_url "https://arkehub.com" @@ -31,6 +31,7 @@ defmodule ArkePostgres.MixProject do mod: {ArkePostgres.Application, []} ] end + defp versioning do [ tag_prefix: "v", diff --git a/priv/repo/migrations/20220610104406_initial_migration.exs b/priv/repo/migrations/20220610104406_initial_migration.exs index 4d6a099..2a3c55c 100644 --- a/priv/repo/migrations/20220610104406_initial_migration.exs +++ b/priv/repo/migrations/20220610104406_initial_migration.exs @@ -3,21 +3,29 @@ defmodule ArkePostgres.Repo.Migrations.InitialMigration do def change do create table(:arke_unit, primary_key: false) do - add :id, :string, primary_key: true - add :arke_id, :string, null: false - add :data, :map, default: %{}, null: false - add :metadata, :map, default: %{}, null: false + add(:id, :string, primary_key: true) + add(:arke_id, :string, null: false) + add(:data, :map, default: %{}, null: false) + add(:metadata, :map, default: %{}, null: false) timestamps() end create table(:arke_link, primary_key: false) do - add :type, :string, default: "link", null: false - add :parent_id, references(:arke_unit, column: :id, type: :string, on_delete: :delete_all), primary_key: true - add :child_id, references(:arke_unit, column: :id, type: :string, on_delete: :delete_all), primary_key: true - add :metadata, :map, default: %{} + add(:type, :string, default: "link", null: false, primary_key: true) + + add(:parent_id, references(:arke_unit, column: :id, type: :string, on_delete: :delete_all), + primary_key: true + ) + + add(:child_id, references(:arke_unit, column: :id, type: :string, on_delete: :delete_all), + primary_key: true + ) + + add(:metadata, :map, default: %{}) end - create index(:arke_link, :parent_id) - create index(:arke_link, :child_id) + create(index(:arke_unit, :arke_id)) + create(index(:arke_link, :parent_id)) + create(index(:arke_link, :child_id)) end end