/
stream.ex
130 lines (107 loc) · 3.38 KB
/
stream.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
defmodule Postgrex.Stream do
@moduledoc """
Stream struct returned from stream commands.
All of its fields are private.
"""
@derive {Inspect, only: []}
defstruct [:conn, :query, :params, :options]
@type t :: %Postgrex.Stream{}
end
defmodule Postgrex.Cursor do
@moduledoc false
defstruct [:portal, :ref, :connection_id, :mode]
@type t :: %Postgrex.Cursor{}
end
defmodule Postgrex.Copy do
@moduledoc false
defstruct [:portal, :ref, :connection_id, :query]
@type t :: %Postgrex.Copy{}
end
defimpl Enumerable, for: Postgrex.Stream do
alias Postgrex.Query
def reduce(%Postgrex.Stream{query: %Query{} = query} = stream, acc, fun) do
%Postgrex.Stream{conn: conn, params: params, options: opts} = stream
stream = %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts}
DBConnection.reduce(stream, acc, fun)
end
def reduce(%Postgrex.Stream{query: statement} = stream, acc, fun) do
%Postgrex.Stream{conn: conn, params: params, options: opts} = stream
query = %Query{name: "", statement: statement}
opts = Keyword.put(opts, :function, :prepare_open)
stream = %DBConnection.PrepareStream{conn: conn, query: query, params: params, opts: opts}
DBConnection.reduce(stream, acc, fun)
end
def member?(_, _) do
{:error, __MODULE__}
end
def count(_) do
{:error, __MODULE__}
end
def slice(_) do
{:error, __MODULE__}
end
end
defimpl Collectable, for: Postgrex.Stream do
alias Postgrex.Stream
alias Postgrex.Query
def into(%Stream{conn: %DBConnection{}} = stream) do
%Stream{conn: conn, query: query, params: params, options: opts} = stream
opts = Keyword.put(opts, :postgrex_copy, true)
case query do
%Query{} ->
copy = DBConnection.execute!(conn, query, params, opts)
{:ok, make_into(conn, stream, copy, opts)}
query ->
query = %Query{name: "", statement: query}
{_, copy} = DBConnection.prepare_execute!(conn, query, params, opts)
{:ok, make_into(conn, stream, copy, opts)}
end
end
def into(_) do
raise ArgumentError, "data can only be copied to database inside a transaction"
end
defp make_into(conn, stream, %Postgrex.Copy{ref: ref} = copy, opts) do
fn
:ok, {:cont, data} ->
_ = DBConnection.execute!(conn, copy, {:copy_data, ref, data}, opts)
:ok
:ok, close when close in [:done, :halt] ->
_ = DBConnection.execute!(conn, copy, {:copy_done, ref}, opts)
stream
end
end
end
defimpl DBConnection.Query, for: Postgrex.Copy do
alias Postgrex.Copy
import Postgrex.Messages
def parse(copy, _) do
raise "can not prepare #{inspect(copy)}"
end
def describe(copy, _) do
raise "can not describe #{inspect(copy)}"
end
def encode(%Copy{ref: ref}, {:copy_data, ref, data}, _) do
try do
encode_msg(msg_copy_data(data: data))
rescue
ArgumentError ->
reraise ArgumentError,
[message: "expected iodata to copy to database, got: " <> inspect(data)],
__STACKTRACE__
else
iodata ->
{:copy_data, iodata}
end
end
def encode(%Copy{ref: ref}, {:copy_done, ref}, _) do
:copy_done
end
def decode(copy, _result, _opts) do
raise "can not describe #{inspect(copy)}"
end
end
defimpl String.Chars, for: Postgrex.Copy do
def to_string(%Postgrex.Copy{query: query}) do
String.Chars.to_string(query)
end
end