Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/queryable-columns.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Add `queryable_columns` as a server-side shape allow-list for columns that may be queried or synced. This lets proxies decouple the `columns` sync projection from the security boundary while preventing `where`, subset filters and ordering, and projected columns from referencing non-queryable columns.
3 changes: 3 additions & 0 deletions packages/sync-service/lib/electric/shapes/api/params.ex
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ defmodule Electric.Shapes.Api.Params do
field(:live, :boolean, default: false)
field(:where, :string)
field(:columns, ColumnList)
field(:queryable_columns, ColumnList)
field(:shape_definition, :string)
field(:replica, Ecto.Enum, values: [:default, :full], default: :default)
field(:params, {:map, :string}, default: %{})
Expand Down Expand Up @@ -319,6 +320,7 @@ defmodule Electric.Shapes.Api.Params do
table = fetch_change!(changeset, :table)
where = fetch_field!(changeset, :where)
columns = get_change(changeset, :columns, nil)
queryable_columns = get_change(changeset, :queryable_columns, nil)
replica = fetch_field!(changeset, :replica)
params = fetch_field!(changeset, :params)
compaction_enabled? = fetch_field!(changeset, @tmp_compaction_flag)
Expand All @@ -328,6 +330,7 @@ defmodule Electric.Shapes.Api.Params do
where: where,
params: params,
columns: columns,
queryable_columns: queryable_columns,
replica: replica,
inspector: api.inspector,
feature_flags: api.feature_flags,
Expand Down
81 changes: 73 additions & 8 deletions packages/sync-service/lib/electric/shapes/shape.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ defmodule Electric.Shapes.Shape do
:where,
:selected_columns,
:explicitly_selected_columns,
:queryable_columns,
shape_dependencies: [],
shape_dependencies_handles: [],
tag_structure: [],
Expand Down Expand Up @@ -69,6 +70,7 @@ defmodule Electric.Shapes.Shape do
where: Electric.Replication.Eval.Expr.t() | nil,
selected_columns: [String.t(), ...],
explicitly_selected_columns: [String.t(), ...],
queryable_columns: [String.t(), ...] | nil,
tag_structure: [String.t() | [String.t(), ...]],
replica: replica(),
storage: storage_config() | nil,
Expand Down Expand Up @@ -163,6 +165,7 @@ defmodule Electric.Shapes.Shape do
relation: [type: {:tuple, [:string, :string]}, required: true],
where: [type: :any],
columns: [type: {:or, [{:list, :string}, nil]}],
queryable_columns: [type: {:or, [{:list, :string}, nil]}],
params: [type: {:map, :string, :string}, default: %{}],
autofill_pk_select?: [type: :boolean, default: false],
replica: [
Expand Down Expand Up @@ -230,14 +233,25 @@ defmodule Electric.Shapes.Shape do
{:ok, {oid, table} = relation} <- validate_relation(opts, inspector),
{:ok, column_info, pk_cols} <- load_column_info(relation, inspector),
{:ok, supported_features} <- load_supported_features(inspector),
{:ok, queryable_columns} <- validate_queryable_columns(column_info, pk_cols, opts),
{:ok, selected_columns, explicitly_selected_columns} <-
validate_selected_columns(column_info, pk_cols, supported_features, opts),
refs = Inspector.columns_to_expr(column_info),
validate_selected_columns(
column_info,
pk_cols,
supported_features,
Map.put(opts, :queryable_columns, queryable_columns)
),
refs =
column_info
|> filter_columns(queryable_columns)
|> Inspector.columns_to_expr(),
{:ok, where, shape_dependencies} <-
validate_where_clause(Map.get(opts, :where), opts, refs) do
flags =
[
if(is_nil(Map.get(opts, :columns)), do: :selects_all_columns),
if(is_nil(Map.get(opts, :columns)) and is_nil(Map.get(opts, :queryable_columns)),
do: :selects_all_columns
),
if(any_columns_generated?(column_info, selected_columns),
do: :selects_generated_columns
),
Expand All @@ -258,6 +272,7 @@ defmodule Electric.Shapes.Shape do
where: where,
selected_columns: selected_columns,
explicitly_selected_columns: explicitly_selected_columns,
queryable_columns: queryable_columns,
replica: Map.get(opts, :replica, :default),
storage: Map.get(opts, :storage) || %{compaction: :disabled},
shape_dependencies: shape_dependencies,
Expand Down Expand Up @@ -322,7 +337,7 @@ defmodule Electric.Shapes.Shape do
end

defp build_shape_dependencies(subqueries, opts) do
shared_opts = Map.drop(opts, [:where, :columns, :relation])
shared_opts = Map.drop(opts, [:where, :columns, :queryable_columns, :relation])

subqueries
|> Enum.with_index()
Expand Down Expand Up @@ -461,7 +476,9 @@ defmodule Electric.Shapes.Shape do
autofill_pk_select? = Map.fetch!(opts, :autofill_pk_select?)

missing_pk_cols = pk_cols -- columns_to_select
invalid_cols = columns_to_select -- Enum.map(column_info, & &1.name)
queryable_columns = Map.fetch!(opts, :queryable_columns)
selectable_column_names = queryable_columns || Enum.map(column_info, & &1.name)
invalid_cols = columns_to_select -- selectable_column_names
generated_cols = Enum.filter(column_info, &(&1.is_generated and &1.name in columns_to_select))

err_msg =
Expand Down Expand Up @@ -499,12 +516,13 @@ defmodule Electric.Shapes.Shape do
column_info,
_pk_cols,
%{supports_generated_column_replication: supports_generated_column_replication},
_opts
opts
) do
generated_cols = Enum.filter(column_info, & &1.is_generated)
columns_to_select = Map.fetch!(opts, :queryable_columns) || Enum.map(column_info, & &1.name)
generated_cols = Enum.filter(column_info, &(&1.is_generated and &1.name in columns_to_select))

if generated_cols == [] or supports_generated_column_replication do
all_columns = column_info |> Enum.map(& &1.name) |> Enum.sort()
all_columns = Enum.sort(columns_to_select)
{:ok, all_columns, all_columns}
else
err_msg =
Expand All @@ -517,6 +535,50 @@ defmodule Electric.Shapes.Shape do
end
end

defp validate_queryable_columns(column_info, pk_cols, opts) when is_map(opts) do
case Map.get(opts, :queryable_columns) do
nil -> {:ok, nil}
queryable_columns -> validate_queryable_columns(column_info, pk_cols, queryable_columns)
end
end

defp validate_queryable_columns(column_info, pk_cols, queryable_columns)
when is_list(queryable_columns) do
all_column_names = Enum.map(column_info, & &1.name)

missing_pk_cols = pk_cols -- queryable_columns
invalid_cols = queryable_columns -- all_column_names

err_msg =
cond do
missing_pk_cols != [] ->
"The list of queryable columns must include all primary key columns, missing: " <>
Enum.join(missing_pk_cols, ", ")

invalid_cols != [] ->
"The following queryable columns are not found on the table: " <>
Enum.join(invalid_cols, ", ")

queryable_columns == [] ->
"The list of queryable columns must not be empty"

true ->
nil
end

if is_nil(err_msg) do
{:ok, Enum.sort(queryable_columns)}
else
{:error, {:queryable_columns, [err_msg]}}
end
end

defp filter_columns(column_info, nil), do: column_info

defp filter_columns(column_info, column_names) do
Enum.filter(column_info, &(&1.name in column_names))
end

defp table_not_found_error(relation),
do:
{:error,
Expand Down Expand Up @@ -985,6 +1047,7 @@ defmodule Electric.Shapes.Shape do
where: shape.where,
selected_columns: shape.selected_columns,
explicitly_selected_columns: shape.explicitly_selected_columns,
queryable_columns: shape.queryable_columns,
storage: shape.storage,
replica: shape.replica,
shape_dependencies: Enum.map(shape.shape_dependencies, &to_json_safe/1),
Expand Down Expand Up @@ -1025,6 +1088,7 @@ defmodule Electric.Shapes.Shape do
selected_columns: selected_columns,
explicitly_selected_columns:
Map.get(data, "explicitly_selected_columns", selected_columns),
queryable_columns: Map.get(data, "queryable_columns"),
storage: storage_config_from_json(storage),
replica: String.to_existing_atom(replica),
shape_dependencies: shape_dependencies,
Expand Down Expand Up @@ -1095,6 +1159,7 @@ defmodule Electric.Shapes.Shape do
flags: flags,
where: where,
selected_columns: actual_selected_columns,
queryable_columns: Map.get(data, "queryable_columns"),
replica: String.to_atom(Map.get(data, "replica", "default")),
storage: storage_config_from_json(Map.get(data, "storage"))
}}
Expand Down
7 changes: 5 additions & 2 deletions packages/sync-service/lib/electric/shapes/shape/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Electric.Shapes.Shape.Subset do
end
end

defp load_column_info(%{root_table_id: table_oid, root_table: table}, inspector) do
defp load_column_info(%{root_table_id: table_oid, root_table: table} = shape, inspector) do
case Inspector.load_column_info(table_oid, inspector) do
:table_not_found ->
{:error,
Expand All @@ -53,7 +53,10 @@ defmodule Electric.Shapes.Shape.Subset do
"If the table name contains capitals or special characters you must quote it."}}

{:ok, columns} ->
{:ok, columns}
case shape.queryable_columns do
nil -> {:ok, columns}
queryable_columns -> {:ok, Enum.filter(columns, &(&1.name in queryable_columns))}
end
end
end

Expand Down
140 changes: 140 additions & 0 deletions packages/sync-service/test/electric/plug/router_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,95 @@ defmodule Electric.Plug.RouterTest do
assert [%{"key" => ^key, "value" => ^value}, _] = Jason.decode!(conn.resp_body)
end

@tag with_sql: [
"CREATE TABLE queryable_users (id BIGINT PRIMARY KEY, name TEXT NOT NULL, secret_token TEXT NOT NULL)",
"INSERT INTO queryable_users VALUES (1, 'Alice', 'supersecret123')"
]
test "GET allows where clauses on queryable columns that are not selected", %{opts: opts} do
conn =
conn("GET", "/v1/shape", %{
table: "queryable_users",
offset: "-1",
queryable_columns: "id,name,secret_token",
columns: "id,name",
where: "secret_token LIKE 'super%'"
})
|> Router.call(opts)

assert %{status: 200} = conn

assert [
%{
"headers" => %{"operation" => "insert"},
"value" => %{"id" => "1", "name" => "Alice"}
},
%{"headers" => %{"control" => "snapshot-end"}}
] = Jason.decode!(conn.resp_body)
end

@tag with_sql: [
"CREATE TABLE queryable_users (id BIGINT PRIMARY KEY, name TEXT NOT NULL, secret_token TEXT NOT NULL)",
"INSERT INTO queryable_users VALUES (1, 'Alice', 'supersecret123')"
]
test "GET rejects where clauses and selected columns outside queryable_columns", %{opts: opts} do
conn =
conn("GET", "/v1/shape", %{
table: "queryable_users",
offset: "-1",
queryable_columns: "id,name",
columns: "id,name",
where: "secret_token LIKE 'super%'"
})
|> Router.call(opts)

assert %{status: 400} = conn
assert %{"errors" => %{"where" => [message]}} = Jason.decode!(conn.resp_body)
assert message =~ "unknown reference secret_token"

conn =
conn("GET", "/v1/shape", %{
table: "queryable_users",
offset: "-1",
queryable_columns: "id,name",
columns: "id,name,secret_token"
})
|> Router.call(opts)

assert %{status: 400} = conn

assert %{
"errors" => %{
"columns" => ["The following columns are not found on the table: secret_token"]
}
} = Jason.decode!(conn.resp_body)
end

@tag with_sql: [
"CREATE TABLE queryable_users (id BIGINT PRIMARY KEY, name TEXT NOT NULL, secret_token TEXT NOT NULL)",
"INSERT INTO queryable_users VALUES (1, 'Alice', 'supersecret123')"
]
test "GET defaults selected columns to queryable_columns when columns are omitted", %{
opts: opts
} do
conn =
conn("GET", "/v1/shape", %{
table: "queryable_users",
offset: "-1",
queryable_columns: "id,name"
})
|> Router.call(opts)

assert %{status: 200} = conn

assert [
%{
"headers" => %{"operation" => "insert"},
"value" => %{"id" => "1", "name" => "Alice"}
},
%{"headers" => %{"control" => "snapshot-end"}}
] = Jason.decode!(conn.resp_body)
end

test "GET works when there are changes not related to the shape in the same txn", %{
opts: opts,
db_conn: db_conn
Expand Down Expand Up @@ -3477,6 +3566,57 @@ defmodule Electric.Plug.RouterTest do
)
end

@tag with_sql: [
"CREATE TABLE queryable_users (id BIGINT PRIMARY KEY, name TEXT NOT NULL, secret_token TEXT NOT NULL)",
"INSERT INTO queryable_users VALUES (1, 'Alice', 'supersecret123')",
"INSERT INTO queryable_users VALUES (2, 'Bob', 'public')"
]
test "subset snapshots allow subset__where on queryable columns that are not selected", ctx do
conn =
conn("GET", "/v1/shape", %{
"table" => "queryable_users",
"offset" => "-1",
"queryable_columns" => "id,name,secret_token",
"columns" => "id,name",
"subset__where" => "secret_token = 'supersecret123'"
})
|> Router.call(ctx.opts)

assert %{status: 200} = conn

assert %{
"metadata" => _,
"data" => [
%{
"value" => %{"id" => "1", "name" => "Alice"}
}
]
} = Jason.decode!(conn.resp_body)
end

@tag with_sql: [
"CREATE TABLE queryable_users (id BIGINT PRIMARY KEY, name TEXT NOT NULL, secret_token TEXT NOT NULL)",
"INSERT INTO queryable_users VALUES (1, 'Alice', 'supersecret123')"
]
test "subset snapshots reject subset__where clauses outside queryable_columns", ctx do
conn =
conn("GET", "/v1/shape", %{
"table" => "queryable_users",
"offset" => "-1",
"queryable_columns" => "id,name",
"columns" => "id,name",
"subset__where" => "secret_token = 'supersecret123'"
})
|> Router.call(ctx.opts)

assert %{status: 400} = conn

assert %{"errors" => %{"subset" => %{"where" => [message]}}} =
Jason.decode!(conn.resp_body)

assert message =~ "unknown reference secret_token"
end

@tag with_sql: [
"INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')",
"INSERT INTO items VALUES (gen_random_uuid(), 'test value 2')"
Expand Down
Loading
Loading