Skip to content

Commit

Permalink
fix(electric): Add python psycopg3 support (#1375)
Browse files Browse the repository at this point in the history
psycopg sends begin, commit and rollback statements using the extended,
parse, bind, execute, protocol, which is... different
  • Loading branch information
magnetised committed Jun 18, 2024
1 parent b700551 commit 8cddaaf
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/wild-buckets-tease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric": patch
---

Support psycopg3 style transactions in proxy
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,21 @@ defmodule Electric.Postgres.Proxy.Injector.Electric do

{[Operation.Wait.new(msgs, state, signal), stack], {electric, state}}

# psycopg sends its txn commands using the extended protocol, annoyingly
# it uses a [parse, describe, bind, execute, sync] message block, so all we
# need to do is pass that on and mark the connection as in a transaction
%{action: {:tx, action}} = _analysis when action in [:begin, :rollback, :commit] ->
state = State.transaction(state, action)

op =
if Enum.any?(msgs, &is_struct(&1, M.Execute)) do
Operation.Wait.new(msgs, state)
else
%Operation.BindExecute{ops: []}
end

{[op], {electric, state}}

analysis ->
bind = %Operation.BindExecute{
ops: Electric.command_from_analysis([], analysis, state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ defmodule Electric.Postgres.Proxy.Injector.State do
not is_nil(state.tx)
end

def transaction(%__MODULE__{} = state, action) when action in [:begin, :rollback, :commit] do
case action do
:begin -> begin(state)
:rollback -> rollback(state)
:commit -> commit(state)
end
end

@doc """
Update the transaction status to mark it as affecting electrified tables (or
not).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ defimpl QueryAnalyser, for: PgQuery.TransactionStmt do
:TRANS_STMT_BEGIN -> :begin
:TRANS_STMT_COMMIT -> :commit
:TRANS_STMT_ROLLBACK -> :rollback
:TRANS_STMT_SAVEPOINT -> :savepoint
:TRANS_STMT_RELEASE -> :release
end

%{analysis | action: {:tx, kind}}
Expand Down
127 changes: 127 additions & 0 deletions components/electric/test/electric/postgres/proxy/injector_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,133 @@ defmodule Electric.Postgres.Proxy.InjectorTest do
|> server(complete_ready("COMMIT", :idle))
|> idle!()
end

test "psycopg transactions", cxt do
cxt.injector
|> client([
%M.Parse{query: "BEGIN"},
%M.Bind{},
%M.Describe{},
%M.Execute{},
%M.Sync{}
])
|> server([
%M.ParseComplete{},
%M.BindComplete{},
%M.NoData{},
%M.CommandComplete{tag: "BEGIN"},
%M.ReadyForQuery{status: :tx}
])
|> client(%M.Query{query: "select pg_catalog.version()"})
|> server([
%M.RowDescription{
fields: [
%PgProtocol.Message.RowDescription.Field{
name: "version",
oid: 0,
attnum: 0,
type: 25,
typlen: -1,
typmod: -1,
fmt: 0
}
]
},
%M.DataRow{
fields: [
"PostgreSQL 14.12 on x86_64-pc-linux-musl, compiled by gcc (Alpine 13.2.1_git20240309) 13.2.1 20240309, 64-bit"
]
},
%M.CommandComplete{tag: "SELECT 1"},
%M.ReadyForQuery{status: :tx}
])
|> client([
%M.Parse{name: "", query: "ROLLBACK", params: []},
%M.Bind{
portal: "",
source: "",
parameters: [],
parameter_format_codes: [],
result_format_codes: [0]
},
%M.Describe{type: "P", name: ""},
%M.Execute{portal: "", max_rows: 0},
%M.Sync{}
])
|> server([
%M.ParseComplete{},
%M.BindComplete{},
%M.NoData{},
%M.CommandComplete{tag: "ROLLBACK"},
%M.ReadyForQuery{status: :idle}
])
end

test "psycopg savepoints", cxt do
cxt.injector
|> client([
%M.Parse{query: "BEGIN"},
%M.Bind{},
%M.Describe{},
%M.Execute{},
%M.Sync{}
])
|> server([
%M.ParseComplete{},
%M.BindComplete{},
%M.NoData{},
%M.CommandComplete{tag: "BEGIN"},
%M.ReadyForQuery{status: :tx}
])
|> client([
%M.Parse{name: "", query: "SAVEPOINT \"_pg3_1\"", params: []},
%M.Bind{},
%M.Describe{type: "P", name: ""},
%M.Execute{portal: "", max_rows: 0},
%M.Sync{}
])
|> server([
%M.ParseComplete{},
%M.BindComplete{},
%M.NoData{},
%M.CommandComplete{tag: "SAVEPOINT"},
%M.ReadyForQuery{status: :tx}
])
|> client([
%M.Parse{name: "", query: "RELEASE \"_pg3_1\"", params: []},
%M.Bind{},
%M.Describe{type: "P", name: ""},
%M.Execute{portal: "", max_rows: 0},
%M.Sync{}
])
|> server([
%M.ParseComplete{},
%M.BindComplete{},
%M.NoData{},
%M.CommandComplete{tag: "RELEASE"},
%M.ReadyForQuery{status: :tx}
])
|> client([
%M.Parse{name: "", query: "ROLLBACK", params: []},
%M.Bind{
portal: "",
source: "",
parameters: [],
parameter_format_codes: [],
result_format_codes: [0]
},
%M.Describe{type: "P", name: ""},
%M.Execute{portal: "", max_rows: 0},
%M.Sync{}
])
|> server([
%M.ParseComplete{},
%M.BindComplete{},
%M.NoData{},
%M.CommandComplete{tag: "ROLLBACK"},
%M.ReadyForQuery{status: :idle}
])
end
end

describe "Injector.Electric" do
Expand Down

0 comments on commit 8cddaaf

Please sign in to comment.