/
cache.ex
209 lines (174 loc) · 6.69 KB
/
cache.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
defmodule Ecto.Adapters.DynamoDB.Cache do
@moduledoc """
An Elixir agent to cache DynamoDB table schemas and the first page of results for selected tables
"""
@typep table_name_t :: String.t()
@typep dynamo_response_t :: %{required(String.t()) => term}
alias Confex.Resolver
alias Ecto.Adapters.DynamoDB
alias Ecto.Repo
defstruct [
:schemas,
:tables,
:ex_aws_config
]
@type cached_table :: {String.t(), map()}
@type t :: %__MODULE__{
schemas: map(),
tables: [cached_table()]
}
def child_spec([repo]) do
%{
id: repo,
start: {__MODULE__, :start_link, [repo]}
}
end
@spec start_link(Repo.t()) :: Agent.on_start()
def start_link(repo) do
cached_table_list =
repo.config()
|> Resolver.resolve!()
|> Keyword.get(:cached_tables, [])
Agent.start_link(
fn ->
%__MODULE__{
schemas: %{},
tables: for(table_name <- cached_table_list, into: %{}, do: {table_name, nil}),
ex_aws_config: DynamoDB.ex_aws_config(repo)
}
end,
name: agent(repo)
)
end
@doc """
Returns the cached value for a call to DynamoDB, describe-table. Performs a DynamoDB scan if not yet cached and raises any errors as a result of the request. The raw json is presented as an elixir map.
"""
@spec describe_table!(Repo.t(), table_name_t) :: dynamo_response_t | no_return
def describe_table!(repo, table_name) do
case describe_table(repo, table_name) do
{:ok, schema} -> schema
{:error, error} -> raise error.type, message: error.message
end
end
@spec describe_table(Repo.t(), table_name_t) :: {:ok, dynamo_response_t} | {:error, term}
def describe_table(repo, table_name),
do: Agent.get_and_update(agent(repo), &do_describe_table(&1, table_name))
@doc """
Performs a DynamoDB, describe-table, and caches (without returning) the result. Raises any errors as a result of the request
"""
@spec update_table_info!(Repo.t(), table_name_t) :: :ok | no_return
def update_table_info!(repo, table_name) do
case update_table_info(repo, table_name) do
:ok -> :ok
{:error, error} -> raise error.type, message: error.message
end
end
@spec update_table_info(Repo.t(), table_name_t) :: :ok | {:error, term}
def update_table_info(repo, table_name),
do: Agent.get_and_update(agent(repo), &do_update_table_info(&1, table_name))
@doc """
Returns the cached first page of results for a table. Performs a DynamoDB scan if not yet cached and raises any errors as a result of the request
"""
@spec scan!(Repo.t(), table_name_t) :: dynamo_response_t | no_return
def scan!(repo, table_name) do
case scan(repo, table_name) do
{:ok, scan_result} -> scan_result
{:error, error} -> raise error.type, message: error.message
end
end
@spec scan(Repo.t(), table_name_t) :: {:ok, dynamo_response_t} | {:error, term}
def scan(repo, table_name),
do: Agent.get_and_update(agent(repo), &do_scan(&1, table_name))
@doc """
Performs a DynamoDB scan and caches (without returning) the first page of results. Raises any errors as a result of the request
"""
@spec update_cached_table!(Repo.t(), table_name_t) :: :ok | no_return
def update_cached_table!(repo, table_name) do
case update_cached_table(repo, table_name) do
:ok -> :ok
{:error, error} -> raise error.type, message: error.message
end
end
@spec update_cached_table(Repo.t(), table_name_t) :: :ok | {:error, term}
def update_cached_table(repo, table_name),
do: Agent.get_and_update(agent(repo), &do_update_cached_table(&1, table_name))
@doc """
Returns the current cache of table schemas, and cache of first page of results for selected tables, as an Elixir map
"""
# For testing and debugging use only:
def get_cache(repo),
do: Agent.get(agent(repo), & &1)
defp do_describe_table(cache, table_name) do
case cache.schemas[table_name] do
nil ->
result = ExAws.Dynamo.describe_table(table_name) |> ExAws.request(cache.ex_aws_config)
case result do
{:ok, %{"Table" => schema}} ->
updated_cache = put_in(cache.schemas[table_name], schema)
{{:ok, schema}, updated_cache}
{:error, error} ->
{{:error, %{type: ExAws.Error, message: "ExAws Request Error! #{inspect(error)}"}},
cache}
end
schema ->
{{:ok, schema}, cache}
end
end
defp do_update_table_info(cache, table_name) do
result = ExAws.Dynamo.describe_table(table_name) |> ExAws.request(cache.ex_aws_config)
case result do
{:ok, %{"Table" => schema}} ->
updated_cache = put_in(cache.schemas[table_name], schema)
{:ok, updated_cache}
{:error, error} ->
{{:error, %{type: ExAws.Error, message: "ExAws Request Error! #{inspect(error)}"}}, cache}
end
end
defp do_scan(cache, table_name) do
table_name_in_config = Map.has_key?(cache.tables, table_name)
case cache.tables[table_name] do
nil when table_name_in_config ->
result = ExAws.Dynamo.scan(table_name) |> ExAws.request(cache.ex_aws_config)
case result do
{:ok, scan_result} ->
updated_cache = put_in(cache.tables[table_name], scan_result)
{{:ok, scan_result}, updated_cache}
{:error, error} ->
{{:error, %{type: ExAws.Error, message: "ExAws Request Error! #{inspect(error)}"}},
cache}
end
nil ->
{{:error,
%{
type: ArgumentError,
message:
"Could not confirm the table, #{inspect(table_name)}, as listed for caching in the application's configuration. Please see README file for details."
}}, cache}
cached_scan ->
{{:ok, cached_scan}, cache}
end
end
defp do_update_cached_table(cache, table_name) do
table_name_in_config = Map.has_key?(cache.tables, table_name)
case cache.tables[table_name] do
nil when not table_name_in_config ->
{{:error,
%{
type: ArgumentError,
message:
"Could not confirm the table, #{inspect(table_name)}, as listed for caching in the application's configuration. Please see README file for details."
}}, cache}
_ ->
result = ExAws.Dynamo.scan(table_name) |> ExAws.request(cache.ex_aws_config)
case result do
{:ok, scan_result} ->
updated_cache = put_in(cache.tables[table_name], scan_result)
{:ok, updated_cache}
{:error, error} ->
{{:error, %{type: ExAws.Error, message: "ExAws Request Error! #{inspect(error)}"}},
cache}
end
end
end
defp agent(repo), do: Module.concat(repo, Cache)
end