Skip to content

Commit

Permalink
Initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
craigp committed Jul 26, 2016
1 parent ce9eb5b commit e31cea7
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 29 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/_build
/cover
/deps
/doc
erl_crash.dump
*.ez
*.beam
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ language: elixir
env:
- MIX_ENV=test
elixir:
- 1.2.0
- 1.3.0
otp_release:
- 18.2.1
script:
Expand Down
22 changes: 22 additions & 0 deletions lib/gmail/message/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Gmail.Message.Supervisor do

@moduledoc """
Supervises worker processes for fetching messages.
"""

use Supervisor

@doc false
def start_link do
Supervisor.start_link __MODULE__, :ok, name: __MODULE__
end

@doc false
def init(:ok) do
[
worker(Gmail.Message.Worker, [], restart: :transient)
] |> supervise(strategy: :simple_one_for_one)
end

end

140 changes: 140 additions & 0 deletions lib/gmail/message/worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
defmodule Gmail.Message.Worker do

@moduledoc """
A worker for fetching messages.
"""

use GenServer
use Timex
require Logger

@ttl 10
@tick_interval 1000

alias Gmail.{Message, User}

@doc false
def start_link(message_id, %{user_id: user_id} = state) do
state = Map.put(state, :message_id, message_id)
GenServer.start_link(__MODULE__, state, name: build_tag(user_id, message_id))
end

@doc """
Initialises the process with a supplied TTL rather than the default TTL.
"""
def init(%{ttl: ttl, user_id: user_id} = state) do
state = Map.put(state, :ttl, Time.to_seconds(Time.now) + ttl)
Process.send_after(self, :tick, @tick_interval)
Logger.debug "Subscribing to parent"
Gmail.User.subscribe(user_id, self)
{:ok, state}
end

@doc """
Initialises the process with the default TTL.
"""
def init(%{user_id: user_id} = state) do
state = Map.put(state, :ttl, Time.to_seconds(Time.now) + @ttl)
Process.send_after(self, :tick, @tick_interval)
Logger.debug "Subscribing to parent"
Gmail.User.subscribe(user_id, self)
{:ok, state}
end

@doc false
def handle_info(:tick, %{ttl: ttl} = state) do
if Time.to_seconds(Time.now) - ttl > @ttl do
GenServer.cast(self, :stop)
else
Process.send_after(self, :tick, @tick_interval)
end
{:noreply, state}
end

@doc false
def handle_cast(:stop, state) do
{:stop, :normal, state}
end

@doc false
def terminate(reason, %{message_id: message_id}) do
Logger.debug "Stopping process for message #{message_id} (#{reason})"
end

@doc false
def handle_call({:get, params}, _from, %{message: message, last_params: params} = state) do
state = update_ttl(state)
{:reply, {:ok, message}, state}
end

@doc false
def handle_call({:get, params}, _from, %{message_id: message_id, user_id: user_id} = state) do
result =
user_id
|> Message.get(message_id, params)
|> User.http_execute(state)
|> Message.handle_message_response
case result do
{:ok, message} ->
Logger.debug "Caching result for message #{message_id}"
state = Map.merge(state, %{message: message, last_params: params})
_otherwise ->
:noop # TODO fix this, it's awful
end
state = update_ttl(state)
{:reply, result, state}
end

@doc """
Gets a message.
"""
@spec get(String.t, map, map) :: {atom, map} | {atom, String.t}
def get(message_id, params, state) do
message_id
|> ensure_server_started(state)
|> GenServer.call({:get, params})
end

@doc """
Fetches multiple messages in parallel.
"""
@spec fetch({pid, reference}, list, map, map) :: :ok
def fetch(from, message_ids, params, state) do
messages =
message_ids
|> Enum.map(fn id ->
Task.async(fn ->
{:ok, message} = get(id, params, state)
message
end)
end)
|> Enum.map(&Task.await/1)
GenServer.reply(from, {:ok, messages})
end

@spec update_ttl(map) :: map
defp update_ttl(%{message_id: message_id} = state) do
Logger.debug("Updating ttl for message process #{message_id}")
Map.put(state, :ttl, Time.to_seconds(Time.now) + @ttl)
end

@spec ensure_server_started(String.t, map) :: pid
defp ensure_server_started(message_id, %{user_id: user_id} = state) do
pid = user_id
|> build_tag(message_id)
|> Process.whereis
if pid do
Logger.debug "Message process found for #{message_id}"
pid
else
Logger.debug "Starting message process for #{message_id}"
{:ok, pid} = Supervisor.start_child(Gmail.Message.Supervisor, [message_id, state])
pid
end
end

@spec build_tag(String.t, String.t) :: atom
defp build_tag(user_id, message_id) do
String.to_atom("#{user_id}_#{message_id}")
end
end
3 changes: 2 additions & 1 deletion lib/gmail/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ defmodule Gmail.Supervisor do
def init(:ok) do
children = [
supervisor(Gmail.UserManager, []),
supervisor(Gmail.Thread.Supervisor, [])
supervisor(Gmail.Thread.Supervisor, []),
supervisor(Gmail.Message.Supervisor, [])
]
supervise(children, strategy: :one_for_one)
end
Expand Down
37 changes: 23 additions & 14 deletions lib/gmail/user.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ defmodule Gmail.User do
@doc false
def handle_cast({:subscribe, pid}, %{pids: pids} = state) do
Logger.debug "Subscribe from pid #{:erlang.pid_to_list(pid)}"
unless pid in pids do
Process.monitor(pid)
state = Map.put(state, :pids, [pid|pids])
state = cond do
pid in pids ->
state
true ->
Process.monitor(pid)
Map.put(state, :pids, [pid|pids])
end
{:noreply, state}
end
Expand Down Expand Up @@ -137,12 +140,15 @@ defmodule Gmail.User do
end

@doc false
def handle_call({:message, {:get, message_id, params}}, _from, %{user_id: user_id} = state) do
result =
user_id
|> Message.get(message_id, params)
|> http_execute(state)
|> Message.handle_message_response
def handle_call({:message, {:get, message_ids, params}}, from, state) when is_list(message_ids) do
Gmail.Message.Worker.fetch(from, message_ids, params, state)
{:noreply, state}
end

# TODO check this to use worker
@doc false
def handle_call({:message, {:get, message_id, params}}, _from, state) do
result = Gmail.Message.Worker.get(message_id, params, state);
{:reply, result, state}
end

Expand Down Expand Up @@ -663,11 +669,14 @@ defmodule Gmail.User do
"""
@spec http_execute({atom, String.t, String.t} | {atom, String.t, String.t, map}, map) :: atom | {atom, map | String.t}
def http_execute(action, %{refresh_token: refresh_token, user_id: user_id} = state) do
if OAuth2.access_token_expired?(state) do
Logger.debug "Refreshing access token for #{user_id}"
{access_token, expires_at} = OAuth2.refresh_access_token(refresh_token)
GenServer.cast(String.to_atom(user_id), {:update_access_token, access_token, expires_at})
state = %{state | access_token: access_token}
state = cond do
OAuth2.access_token_expired?(state) ->
Logger.debug "Refreshing access token for #{user_id}"
{access_token, expires_at} = OAuth2.refresh_access_token(refresh_token)
GenServer.cast(String.to_atom(user_id), {:update_access_token, access_token, expires_at})
%{state | access_token: access_token}
true ->
state
end
HTTP.execute(action, state)
end
Expand Down
1 change: 0 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Gmail.Mixfile do
def project do
[app: :gmail,
version: "0.1.11",
elixir: "~> 1.2",
deps: deps,
test_coverage: [tool: ExCoveralls],
preferred_cli_env: ["coveralls": :test, "coveralls.detail": :test, "coveralls.post": :test],
Expand Down
24 changes: 12 additions & 12 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
%{"bunt": {:hex, :bunt, "0.1.6", "5d95a6882f73f3b9969fdfd1953798046664e6f77ec4e486e6fafc7caad97c6f", [:mix], []},
"bypass": {:hex, :bypass, "0.5.1", "cf3e8a4d376ee1dcd89bf362dfaf1f4bf4a6e19895f52fdc2bafbd8207ce435f", [:mix], [{:plug, "~> 1.0", [hex: :plug, optional: false]}, {:cowboy, "~> 1.0", [hex: :cowboy, optional: false]}]},
"certifi": {:hex, :certifi, "0.4.0", "a7966efb868b179023618d29a407548f70c52466bf1849b9e8ebd0e34b7ea11f", [:rebar3], []},
"combine": {:hex, :combine, "0.7.0", "2ac6ae852a9835fe8189af18121cddd5bed2677f5df706dc0d208af668ab845d", [:mix], []},
"combine": {:hex, :combine, "0.9.1", "5fd778ee77032ae593bf79aedb8519d9e36283e4f869abd98c2d6029ca476db8", [:mix], []},
"cowboy": {:hex, :cowboy, "1.0.4", "a324a8df9f2316c833a470d918aaf73ae894278b8aa6226ce7a9bf699388f878", [:rebar, :make], [{:cowlib, "~> 1.0.0", [hex: :cowlib, optional: false]}, {:ranch, "~> 1.0", [hex: :ranch, optional: false]}]},
"cowlib": {:hex, :cowlib, "1.0.2", "9d769a1d062c9c3ac753096f868ca121e2730b9a377de23dec0f7e08b1df84ee", [:make], []},
"credo": {:hex, :credo, "0.3.13", "90d2d2deb9d376bb2a63f81126a320c3920ce65acb1294982ab49a8aacc7d89f", [:mix], [{:bunt, "~> 0.1.4", [hex: :bunt, optional: false]}]},
"dialyxir": {:hex, :dialyxir, "0.3.3", "2f8bb8ab4e17acf4086cae847bd385c0f89296d3e3448dc304c26bfbe4b46cb4", [:mix], []},
"credo": {:hex, :credo, "0.4.5", "5c5daaf50a2a96068c0f21b6fbd382d206702efa8836a946eeab0b8ac25f5f22", [:mix], [{:bunt, "~> 0.1.6", [hex: :bunt, optional: false]}]},
"dialyxir": {:hex, :dialyxir, "0.3.5", "eaba092549e044c76f83165978979f60110dc58dd5b92fd952bf2312f64e9b14", [:mix], []},
"earmark": {:hex, :earmark, "0.2.1", "ba6d26ceb16106d069b289df66751734802777a3cbb6787026dd800ffeb850f3", [:mix], []},
"ex_doc": {:hex, :ex_doc, "0.11.5", "0dc51cb84f8312162a2313d6c71573a9afa332333d8a332bb12540861b9834db", [:mix], [{:earmark, "~> 0.1.17 or ~> 0.2", [hex: :earmark, optional: true]}]},
"excoveralls": {:hex, :excoveralls, "0.5.4", "1a6e116bcf980da8b7fe33140c1d7e61aa0a4e51951cadbfacc420c12d2b9b8f", [:mix], [{:hackney, ">= 0.12.0", [hex: :hackney, optional: false]}, {:exjsx, "~> 3.0", [hex: :exjsx, optional: false]}]},
"ex_doc": {:hex, :ex_doc, "0.12.0", "b774aabfede4af31c0301aece12371cbd25995a21bb3d71d66f5c2fe074c603f", [:mix], [{:earmark, "~> 0.2", [hex: :earmark, optional: false]}]},
"excoveralls": {:hex, :excoveralls, "0.5.5", "d97b6fc7aa59c5f04f2fa7ec40fc0b7555ceea2a5f7e7c442aad98ddd7f79002", [:mix], [{:hackney, ">= 0.12.0", [hex: :hackney, optional: false]}, {:exjsx, "~> 3.0", [hex: :exjsx, optional: false]}]},
"exjsx": {:hex, :exjsx, "3.2.0", "7136cc739ace295fc74c378f33699e5145bead4fdc1b4799822d0287489136fb", [:mix], [{:jsx, "~> 2.6.2", [hex: :jsx, optional: false]}]},
"gettext": {:hex, :gettext, "0.11.0", "80c1dd42d270482418fa158ec5ba073d2980e3718bacad86f3d4ad71d5667679", [:mix], []},
"hackney": {:hex, :hackney, "1.6.0", "8d1e9440c9edf23bf5e5e2fe0c71de03eb265103b72901337394c840eec679ac", [:rebar3], [{:ssl_verify_fun, "1.1.0", [hex: :ssl_verify_fun, optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, optional: false]}, {:metrics, "1.0.1", [hex: :metrics, optional: false]}, {:idna, "1.2.0", [hex: :idna, optional: false]}, {:certifi, "0.4.0", [hex: :certifi, optional: false]}]},
"httpoison": {:hex, :httpoison, "0.8.3", "b675a3fdc839a0b8d7a285c6b3747d6d596ae70b6ccb762233a990d7289ccae4", [:mix], [{:hackney, "~> 1.6.0", [hex: :hackney, optional: false]}]},
"hackney": {:hex, :hackney, "1.6.1", "ddd22d42db2b50e6a155439c8811b8f6df61a4395de10509714ad2751c6da817", [:rebar3], [{:ssl_verify_fun, "1.1.0", [hex: :ssl_verify_fun, optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, optional: false]}, {:metrics, "1.0.1", [hex: :metrics, optional: false]}, {:idna, "1.2.0", [hex: :idna, optional: false]}, {:certifi, "0.4.0", [hex: :certifi, optional: false]}]},
"httpoison": {:hex, :httpoison, "0.9.0", "68187a2daddfabbe7ca8f7d75ef227f89f0e1507f7eecb67e4536b3c516faddb", [:mix], [{:hackney, "~> 1.6.0", [hex: :hackney, optional: false]}]},
"idna": {:hex, :idna, "1.2.0", "ac62ee99da068f43c50dc69acf700e03a62a348360126260e87f2b54eced86b2", [:rebar3], []},
"inch_ex": {:hex, :inch_ex, "0.5.1", "c1c18966c935944cbb2d273796b36e44fab3c54fd59f906ff026a686205b4e14", [:mix], [{:poison, "~> 1.5 or ~> 2.0", [hex: :poison, optional: false]}]},
"inch_ex": {:hex, :inch_ex, "0.5.3", "39f11e96181ab7edc9c508a836b33b5d9a8ec0859f56886852db3d5708889ae7", [:mix], [{:poison, "~> 1.5 or ~> 2.0", [hex: :poison, optional: false]}]},
"jsx": {:hex, :jsx, "2.6.2", "213721e058da0587a4bce3cc8a00ff6684ced229c8f9223245c6ff2c88fbaa5a", [:mix, :rebar], []},
"meck": {:hex, :meck, "0.8.4", "59ca1cd971372aa223138efcf9b29475bde299e1953046a0c727184790ab1520", [:rebar, :make], []},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], []},
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []},
"mock": {:hex, :mock, "0.1.3", "657937b03f88fce89b3f7d6becc9f1ec1ac19c71081aeb32117db9bc4d9b3980", [:mix], [{:meck, "~> 0.8.2", [hex: :meck, optional: false]}]},
"plug": {:hex, :plug, "1.1.4", "2eee0e85ad420db96e075b3191d3764d6fff61422b101dc5b02e9cce99cacfc7", [:mix], [{:cowboy, "~> 1.0", [hex: :cowboy, optional: true]}]},
"poison": {:hex, :poison, "2.1.0", "f583218ced822675e484648fa26c933d621373f01c6c76bd00005d7bd4b82e27", [:mix], []},
"plug": {:hex, :plug, "1.1.6", "8927e4028433fcb859e000b9389ee9c37c80eb28378eeeea31b0273350bf668b", [:mix], [{:cowboy, "~> 1.0", [hex: :cowboy, optional: true]}]},
"poison": {:hex, :poison, "2.2.0", "4763b69a8a77bd77d26f477d196428b741261a761257ff1cf92753a0d4d24a63", [:mix], []},
"ranch": {:hex, :ranch, "1.2.1", "a6fb992c10f2187b46ffd17ce398ddf8a54f691b81768f9ef5f461ea7e28c762", [:make], []},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.0", "edee20847c42e379bf91261db474ffbe373f8acb56e9079acb6038d4e0bf414f", [:rebar, :make], []},
"ssl_verify_hostname": {:hex, :ssl_verify_hostname, "1.0.6"},
"timex": {:hex, :timex, "2.1.4", "17100f0d47473fe05423084039b8f3de9d63617b66d8db676e24a4221216db61", [:mix], [{:tzdata, "~> 0.1.8 or ~> 0.5", [hex: :tzdata, optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, optional: false]}, {:combine, "~> 0.7", [hex: :combine, optional: false]}]},
"tzdata": {:hex, :tzdata, "0.5.7", "10a4176023c2b294f1bbb98594a646ef25d658ca507b2da20a5952072d6080c4", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, optional: false]}]}}
"timex": {:hex, :timex, "2.2.1", "0d69012a7fd69f4cbdaa00cc5f2a5f30f1bed56072fb362ed4bddf60db343022", [:mix], [{:gettext, "~> 0.10", [hex: :gettext, optional: false]}, {:combine, "~> 0.7", [hex: :combine, optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5", [hex: :tzdata, optional: false]}]},
"tzdata": {:hex, :tzdata, "0.5.8", "a4ffe564783c6519e4df230a5d0e1cf44b7db7f576bcae76d05540b5da5b6143", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, optional: false]}]}}

0 comments on commit e31cea7

Please sign in to comment.