Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion lib/arke_postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

defmodule ArkePostgres do
alias Arke.Boundary.{GroupManager, ArkeManager}
alias ArkePostgres.{Table, ArkeUnit, Query}
alias ArkePostgres.{Table, ArkeUnit, ArkeLink, Query}

def init() do
case check_env() do
Expand Down Expand Up @@ -112,6 +112,14 @@ defmodule ArkePostgres do
end
end

defp handle_create(
project,
%{id: :arke_link} = arke,
unit_list,
opts
),
do: ArkeLink.insert(project, arke, unit_list, opts)

defp handle_create(
project,
%{data: %{type: "table"}} = arke,
Expand All @@ -120,6 +128,7 @@ defmodule ArkePostgres do
) do
# todo: handle bulk?
# todo: remove once the project is not needed anymore

data = data |> Map.merge(%{metadata: Map.delete(metadata, :project)}) |> data_as_klist
Table.insert(project, arke, data)
{:ok, unit}
Expand All @@ -144,6 +153,14 @@ defmodule ArkePostgres do
handle_update(project, arke, unit_list, opts)
end

def handle_update(
project,
%{id: :arke_link} = arke,
unit_list,
opts
),
do: ArkeLink.update(project, arke, unit_list, opts)

def handle_update(
project,
%{data: %{type: "table"}} = arke,
Expand Down Expand Up @@ -182,6 +199,13 @@ defmodule ArkePostgres do
handle_delete(project, arke, unit_list)
end

defp handle_delete(
project,
%{id: :arke_link} = arke,
unit_list
),
do: ArkeLink.delete(project, arke, unit_list)

defp handle_delete(
project,
%{data: %{type: "table"}} = arke,
Expand Down
117 changes: 117 additions & 0 deletions lib/arke_postgres/arke_link.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright 2023 Arkemis S.r.l.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule ArkePostgres.ArkeLink do
import Ecto.Query, only: [from: 2, dynamic: 2, dynamic: 1]
alias Arke.Utils.ErrorGenerator, as: Error

def get_all(project, schema, fields, where \\ []) do
query = from(ArkePostgres.Tables.ArkeLink, select: ^fields, where: ^where)
ArkePostgres.Repo.all(query, prefix: project)
end

def get_by(project, schema, fields, where) do
query = from(ArkePostgres.Tables.ArkeLink, select: ^fields, where: ^where)
ArkePostgres.Repo.one(query, prefix: project)
end

def insert(project, schema, data, opts \\ []) do
records =
Enum.map(data, fn unit ->
%{
parent_id: Map.get(unit.data, :parent_id),
child_id: Map.get(unit.data, :child_id),
type: Map.get(unit.data, :type),
metadata: Map.get(unit, :metadata)
}
end)

case ArkePostgres.Repo.insert_all(ArkePostgres.Tables.ArkeLink, records,
prefix: project,
returning: [:child_id, :parent_id, :type]
) do
{0, _} ->
{:error, Error.create(:insert, "no records inserted")}

{count, inserted} ->
inserted_keys =
Enum.map(inserted, fn link -> {link.type, link.parent_id, link.child_id} end)

{valid, errors} =
Enum.split_with(data, fn unit ->
{unit.data[:type], unit.data[:parent_id], unit.data[:child_id]} in inserted_keys
end)

case opts[:bulk] do
true -> {:ok, count, valid, errors}
_ -> {:ok, List.first(valid)}
end
end
end

def update(project, schema, data, opts \\ []) do
records =
Enum.map(data, fn unit ->
%{
parent_id: Map.get(unit.data, :parent_id),
child_id: Map.get(unit.data, :child_id),
type: Map.get(unit.data, :type),
metadata: Map.delete(Map.get(unit, :metadata), :project)
}
end)

case ArkePostgres.Repo.insert_all(ArkePostgres.Tables.ArkeLink, records,
prefix: project,
returning: [:child_id, :parent_id, :type],
on_conflict: {:replace_all_except, [:type, :parent_id, :child_id, :id]},
conflict_target: [:type, :parent_id, :child_id]
) do
{0, _} ->
{:error, Error.create(:insert, "no records inserted")}

{count, updated} ->
updated_keys =
Enum.map(updated, fn link -> {link.type, link.parent_id, link.child_id} end)

{valid, errors} =
Enum.split_with(data, fn unit ->
{unit.data[:type], unit.data[:parent_id], unit.data[:child_id]} in updated_keys
end)

case opts[:bulk] do
true -> {:ok, count, valid, errors}
_ -> {:ok, List.first(valid)}
end
end
end

def delete(project, schema, unit_list) do
where =
Enum.reduce(unit_list, dynamic(false), fn unit, dynamic ->
dynamic(
[a],
^dynamic or
(a.parent_id == ^unit.data.parent_id and a.child_id == ^unit.data.child_id and
a.type == ^unit.data.type)
)
end)

query = from(ArkePostgres.Tables.ArkeLink, where: ^where)

case ArkePostgres.Repo.delete_all(query, prefix: project) do
{0, nil} -> Error.create(:delete, "item not found")
_ -> {:ok, nil}
end
end
end
91 changes: 54 additions & 37 deletions lib/arke_postgres/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

defmodule ArkePostgres.Query do
import Ecto.Query
require IEx
alias Arke.Utils.DatetimeHandler, as: DatetimeHandler

@record_fields [:id, :arke_id, :data, :metadata, :inserted_at, :updated_at]
Expand Down Expand Up @@ -327,60 +328,76 @@ defmodule ArkePostgres.Query do
end
end

defp extract_paths(items) do
items
|> Enum.flat_map(fn
%{base_filters: base_filters} -> Enum.map(base_filters, & &1.path)
%{path: path} -> [path]
_ -> []
end)
defp extract_paths(filters) do
filters
|> Enum.flat_map(&extract_path/1)
|> Enum.reject(&(is_nil(&1) or length(&1) == 0))
|> Enum.uniq()
end

def extract_path(%Arke.Core.Query.Filter{base_filters: base_filters}),
do: Enum.flat_map(base_filters, &extract_path/1)

def extract_path(%Arke.Core.Query.BaseFilter{path: path}), do: [path]
def extract_path(_), do: []

def handle_filters(query, filters) do
Enum.reduce(filters, query, fn %{logic: logic, negate: negate, base_filters: base_filters},
new_query ->
clause = handle_condition(logic, base_filters) |> handle_negate_condition(negate)
from(q in new_query, where: ^clause)
query ->
clause = build_filter_clause(logic, base_filters) |> handle_negate_condition(negate)
from(q in query, where: ^clause)
end)
end

def handle_condition(logic, base_filters) do
Enum.reduce(base_filters, nil, fn %{
parameter: parameter,
operator: operator,
value: value,
negate: negate,
path: path
},
clause ->
if length(path) == 0 do
parameter_condition(clause, parameter, value, operator, negate, logic)
|> add_condition_to_clause(clause, logic)
else
# todo enhance to get multi-level path
path_parameter = List.first(path)

if not is_nil(path_parameter) do
parameter_condition(clause, parameter, value, operator, negate, logic, true)
|> add_nested_condition_to_clause(clause, logic)
end
end
defp build_filter_clause(parent_logic, filters),
do: build_filter_clause(nil, parent_logic, filters)

defp build_filter_clause(clause, parent_logic, filters) do
Enum.reduce(filters, clause, fn
%Arke.Core.Query.Filter{logic: logic, base_filters: nested_filters}, clause ->
nested_clause = build_filter_clause(nil, logic, nested_filters)
add_condition_to_clause(nested_clause, clause, parent_logic)

%Arke.Core.Query.BaseFilter{
parameter: parameter,
operator: operator,
value: value,
negate: negate,
path: []
},
clause ->
parameter_condition(clause, parameter, value, operator, negate, parent_logic)
|> add_condition_to_clause(clause, parent_logic)

%Arke.Core.Query.BaseFilter{
parameter: parameter,
operator: operator,
value: value,
negate: negate,
path: [path_parameter | _]
},
clause
when not is_nil(path_parameter) ->
parameter_condition(clause, parameter, value, operator, negate, parent_logic, true)
|> add_nested_condition_to_clause(clause, parent_logic)

_, clause ->
clause
end)
end

defp parameter_condition(clause, parameter, value, operator, negate, logic, joined \\ nil) do
column = get_column(parameter, joined)
value = get_value(parameter, value)

if is_nil(value) or operator == :isnull do
condition = get_nil_query(parameter, column) |> handle_negate_condition(negate)
else
condition =
condition =
if is_nil(value) or operator == :isnull do
get_nil_query(parameter, column)
else
filter_query_by_operator(parameter, column, value, operator)
|> handle_negate_condition(negate)
end
end

handle_negate_condition(condition, negate)
end

defp handle_negate_condition(condition, true), do: dynamic([q], not (^condition))
Expand Down
10 changes: 6 additions & 4 deletions lib/arke_postgres/tables/arke_link.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule ArkePostgres.ArkeLink do
defmodule ArkePostgres.Tables.ArkeLink do
use Ecto.Schema
import Ecto.Changeset

@arke_link_fields ~w[type parent_id child_id metadata]a

@foreign_key_type :string
schema "arke_link" do
field(:type, :string, default: "link")
belongs_to(:parent_id, ArkePostgres.ArkeUnit, primary_key: true)
belongs_to(:child_id, ArkePostgres.ArkeUnit, primary_key: true)
field(:type, :string, default: "link", primary_key: true)
belongs_to(:parent, ArkePostgres.ArkeUnit, primary_key: true, foreign_key: :parent_id)
belongs_to(:child, ArkePostgres.ArkeUnit, primary_key: true, foreign_key: :child_id)
field(:metadata, :map, default: %{})
end

# TODO: add insert_all validation

def changeset(args \\ []) do
%__MODULE__{}
|> cast(args, @arke_link_fields)
Expand Down
2 changes: 2 additions & 0 deletions lib/arke_postgres/tables/arke_unit.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ defmodule ArkePostgres.Tables.ArkeUnit do
timestamps()
end

# TODO: add insert_all validation

def changeset(args \\ []) do
%__MODULE__{}
|> cast(args, @arke_record_fields)
Expand Down
28 changes: 18 additions & 10 deletions priv/repo/migrations/20220610104406_initial_migration.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,29 @@ defmodule ArkePostgres.Repo.Migrations.InitialMigration do

def change do
create table(:arke_unit, primary_key: false) do
add :id, :string, primary_key: true
add :arke_id, :string, null: false
add :data, :map, default: %{}, null: false
add :metadata, :map, default: %{}, null: false
add(:id, :string, primary_key: true)
add(:arke_id, :string, null: false)
add(:data, :map, default: %{}, null: false)
add(:metadata, :map, default: %{}, null: false)
timestamps()
end

create table(:arke_link, primary_key: false) do
add :type, :string, default: "link", null: false
add :parent_id, references(:arke_unit, column: :id, type: :string, on_delete: :delete_all), primary_key: true
add :child_id, references(:arke_unit, column: :id, type: :string, on_delete: :delete_all), primary_key: true
add :metadata, :map, default: %{}
add(:type, :string, default: "link", null: false, primary_key: true)

add(:parent_id, references(:arke_unit, column: :id, type: :string, on_delete: :delete_all),
primary_key: true
)

add(:child_id, references(:arke_unit, column: :id, type: :string, on_delete: :delete_all),
primary_key: true
)

add(:metadata, :map, default: %{})
end

create index(:arke_link, :parent_id)
create index(:arke_link, :child_id)
create(index(:arke_unit, :arke_id))
create(index(:arke_link, :parent_id))
create(index(:arke_link, :child_id))
end
end