Skip to content

Commit

Permalink
Fetch webhook events from Stripe
Browse files Browse the repository at this point in the history
  • Loading branch information
joshsmith committed Dec 15, 2016
1 parent 232e069 commit 091cfb0
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,79 +5,104 @@ defmodule CodeCorps.StripeService.WebhookProcessing.WebhookProcessor do

alias CodeCorps.StripeEvent
alias CodeCorps.Repo
alias CodeCorps.StripeService.WebhookProcessing.{ConnectEventHandler, PlatformEventHandler}

@api Application.get_env(:code_corps, :stripe)

@doc """
Used to process a Stripe webhook event.
Receives the event json as the first parameter.
Since a webhook can be a platform or a connect webhook,
the function requires the handler module as the second parameter.
Receives the event JSON as the first parameter.
Since a webhook can be a platform or a connect webhook, the function requires
the handler module as the second parameter.
## Returns
* `{:ok, :ignored_by_environment}` if the event was ignored due to environment mismatch
* `{:ok, :enqueued}` if the event will be handled
- `{:ok, pid}` if the event will be handled
- `{:error, :ignored_by_environment}` if the event was ignored due to
environment mismatch
## Note
Stripe events can have their `livemode` property set to `true` or `false`.
A livemode event should be handled by the production environment, while all other environments
handle non-livemode events.
A livemode `true` event should be handled by the production environment,
while all other environments handle livemode `false` events.
"""
def process_async(%{} = json, handler) do
case event_matches_environment?(json) do
true -> do_process_async(json, handler)
false -> {:ok, :ignored_by_environment}
def process_async(%{"id" => id, "livemode" => livemode, "user_id" => user_id} = json, handler) do
case event_matches_environment?(livemode) do
true -> do_process_async(id, user_id, handler, json)
false -> {:error, :ignored_by_environment}
end
end

defp do_process_async(json, handler) do
Task.Supervisor.start_child(:webhook_processor, fn -> do_process(json, handler) end)
def process_async(%{"id" => id, "livemode" => livemode} = json, handler) do
case event_matches_environment?(livemode) do
true -> do_process_async(id, nil, handler, json)
false -> {:error, :ignored_by_environment}
end
end

defp event_matches_environment?(%{"livemode" => livemode}) do
case Application.get_env(:code_corps, :stripe_env) do
:prod -> livemode
_ -> !livemode
end
defp do_process_async(id, user_id, handler, json) do
Task.Supervisor.start_child(:webhook_processor, fn -> do_process(id, user_id, handler, json) end)
end

defp do_process(%{"id" => event_id, "type" => event_type} = json, handler) do
with {:ok, %StripeEvent{} = event} <- find_or_create_event(event_id, event_type) do
case handler.handle_event(json) |> Tuple.to_list do
[:ok, :unhandled_event] -> event |> set_unhandled
[:ok | _results] -> event |> set_processed
[:error | _error] -> event |> set_errored
end
defp do_process(id, user_id, handler, json) do
with {:ok, %Stripe.Event{id: api_event_id, type: api_event_type, user_id: api_user_id}} <- retrieve_event_from_api(id, user_id),
{:ok, endpoint} <- infer_endpoint_from_handler(handler),
{:ok, %StripeEvent{} = event} <- find_or_create_event(api_event_id, api_event_type, api_user_id, endpoint)
do
handle_event(json, event, handler)
else
{:error, :already_processing} -> nil
end
end

defp find_or_create_event(id_from_stripe, type) do
defp event_matches_environment?(livemode) do
case Application.get_env(:code_corps, :stripe_env) do
:prod -> livemode
_ -> !livemode
end
end

defp find_or_create_event(id_from_stripe, type, user_id, endpoint) do
case find_event(id_from_stripe) do
%StripeEvent{status: "processing"} -> {:error, :already_processing}
%StripeEvent{} = event -> {:ok, event}
nil -> create_event(id_from_stripe, type)
nil -> create_event(id_from_stripe, endpoint, type, user_id)
end
end

defp find_event(id_from_stripe) do
Repo.get_by(StripeEvent, id_from_stripe: id_from_stripe)
end

defp create_event(id_from_stripe, type) do
%StripeEvent{} |> StripeEvent.create_changeset(%{id_from_stripe: id_from_stripe, type: type}) |> Repo.insert
defp handle_event(json, event, handler) do
case handler.handle_event(json) |> Tuple.to_list do
[:ok, :unhandled_event] -> event |> set_unhandled
[:ok | _results] -> event |> set_processed
[:error | _error] -> event |> set_errored
end
end

defp set_processed(%StripeEvent{} = event) do
event |> StripeEvent.update_changeset(%{status: "processed"}) |> Repo.update
defp infer_endpoint_from_handler(ConnectEventHandler), do: {:ok, "connect"}
defp infer_endpoint_from_handler(PlatformEventHandler), do: {:ok, "platform"}
defp infer_endpoint_from_handler(_), do: {:error, :invalid_handler}

defp retrieve_event_from_api(id, nil), do: @api.Event.retrieve(id)
defp retrieve_event_from_api(id, user_id), do: @api.Event.retrieve(id, connect_account: user_id)

defp create_event(id_from_stripe, endpoint, type, user_id) do
%StripeEvent{} |> StripeEvent.create_changeset(%{endpoint: endpoint, id_from_stripe: id_from_stripe, type: type, user_id: user_id}) |> Repo.insert
end

defp set_errored(%StripeEvent{} = event) do
event |> StripeEvent.update_changeset(%{status: "errored"}) |> Repo.update
end

defp set_processed(%StripeEvent{} = event) do
event |> StripeEvent.update_changeset(%{status: "processed"}) |> Repo.update
end

defp set_unhandled(%StripeEvent{} = event) do
event |> StripeEvent.update_changeset(%{status: "unhandled"}) |> Repo.update
end
Expand Down
39 changes: 39 additions & 0 deletions lib/code_corps/stripe_testing/event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule CodeCorps.StripeTesting.Event do
def retrieve(id, _opts = [connect_account: _]) do
{:ok, do_retrieve_connect(id)}
end
def retrieve(id) do
{:ok, do_retrieve(id)}
end

defp do_retrieve(_) do
{:ok, created} = DateTime.from_unix(1479472835)

%Stripe.Event{
api_version: "2016-07-06",
created: created,
id: "evt_123",
livemode: false,
object: "event",
pending_webhooks: 1,
request: nil,
type: "any.event"
}
end

defp do_retrieve_connect(_) do
{:ok, created} = DateTime.from_unix(1479472835)

%Stripe.Event{
api_version: "2016-07-06",
created: created,
id: "evt_123",
livemode: false,
object: "event",
pending_webhooks: 1,
request: nil,
type: "any.event",
user_id: "acct_123"
}
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ defmodule CodeCorps.Mixfile do
{:scrivener_ecto, "~> 1.0"}, # DB query pagination
{:segment, github: "stueccles/analytics-elixir"}, # Segment analytics
{:sentry, "~> 2.0"}, # Sentry error tracking
{:stripity_stripe, "~> 2.0.0-alpha.5"}, # Stripe
{:stripity_stripe, git: "https://github.com/code-corps/stripity_stripe.git", branch: "2.0"}, # Stripe
{:timber, "~> 0.4"}, # Logging
{:timex, "~> 3.0"},
{:timex_ecto, "~> 3.0"},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"sentry": {:hex, :sentry, "2.0.2", "f08638758f7bf891e238466009f6cd702fc26d87286663af26927a78ed149346", [:mix], [{:hackney, "~> 1.6.1", [hex: :hackney, optional: false]}, {:plug, "~> 1.0", [hex: :plug, optional: true]}, {:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, optional: false]}, {:uuid, "~> 1.0", [hex: :uuid, optional: false]}]},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:rebar, :make], []},
"stripe_eventex": {:hex, :stripe_eventex, "1.0.0", "782016598b751c0fdb5489038c92c30a5aab034636d0d9d3a486f75a01fbf0b6", [:mix], [{:cowboy, "~> 1.0.0", [hex: :cowboy, optional: false]}, {:plug, "~> 1.0", [hex: :plug, optional: false]}, {:poison, "~> 2.0", [hex: :poison, optional: false]}]},
"stripity_stripe": {:hex, :stripity_stripe, "2.0.0-alpha.5", "ba6d4ffc6251029135c76e9c6e2dd77580713f5c6833fb82da708336023bbfa2", [:mix], [{:hackney, "~> 1.6", [hex: :hackney, optional: false]}, {:poison, "~> 2.0 or ~> 3.0", [hex: :poison, optional: false]}]},
"stripity_stripe": {:git, "https://github.com/code-corps/stripity_stripe.git", "d26b09aff994a30a17bcd35eff6863cbaadf5ec8", [branch: "2.0"]},
"timber": {:hex, :timber, "0.4.7", "df3fcd79bcb4eb4b53874d906ef5f3a212937b4bc7b7c5b244745202cc389443", [:mix], [{:ecto, "~> 2.0", [hex: :ecto, optional: true]}, {:phoenix, "~> 1.2", [hex: :phoenix, optional: true]}, {:plug, "~> 1.2", [hex: :plug, optional: true]}, {:poison, "~> 2.0 or ~> 3.0", [hex: :poison, optional: false]}]},
"timex": {:hex, :timex, "3.1.5", "413d6d8d6f0162a5d47080cb8ca520d790184ac43e097c95191c7563bf25b428", [:mix], [{:combine, "~> 0.7", [hex: :combine, optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5", [hex: :tzdata, optional: false]}]},
"timex_ecto": {:hex, :timex_ecto, "3.0.5", "3ec6c25e10d2c0020958e5df64d2b5e690e441faa2c2259da8bc6bd3d7f39256", [:mix], [{:ecto, "~> 2.0", [hex: :ecto, optional: false]}, {:timex, "~> 3.0", [hex: :timex, optional: false]}]},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule CodeCorps.Repo.Migrations.AddUserIdAndEndpointToStripeEvents do
use Ecto.Migration

def change do
alter table(:stripe_events) do
add :endpoint, :string, null: false
add :user_id, :string
end
end
end
3 changes: 2 additions & 1 deletion test/controllers/stripe_connect_events_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ defmodule CodeCorps.StripeConnectEventsControllerTest do
"object" => "event",
"pending_webhooks" => 1,
"request" => nil,
"type" => type
"type" => type,
"user_id" => "acct_123"
}
end

Expand Down
15 changes: 13 additions & 2 deletions test/models/stripe_event_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule CodeCorps.StripeEventTest do
alias CodeCorps.StripeEvent

describe "create_changeset/2" do
@valid_attrs %{id_from_stripe: "evt_123", type: "any.event"}
@valid_attrs %{endpoint: "connect", id_from_stripe: "evt_123", type: "any.event"}

test "reports as valid when attributes are valid" do
changeset = StripeEvent.create_changeset(%StripeEvent{}, @valid_attrs)
Expand All @@ -15,6 +15,7 @@ defmodule CodeCorps.StripeEventTest do
changeset = StripeEvent.create_changeset(%StripeEvent{}, %{})

refute changeset.valid?
assert changeset.errors[:endpoint] == {"can't be blank", []}
assert changeset.errors[:id_from_stripe] == {"can't be blank", []}
assert changeset.errors[:type] == {"can't be blank", []}
end
Expand All @@ -27,6 +28,16 @@ defmodule CodeCorps.StripeEventTest do

assert record.status == "processing"
end

test "prevents :endpoint from being invalid" do
event = insert(:stripe_event)

attrs = %{endpoint: "random", id_from_stripe: "evt_123", type: "any.event"}
changeset = StripeEvent.create_changeset(event, attrs)

refute changeset.valid?
assert changeset.errors[:endpoint] == {"is invalid", []}
end
end

describe "update_changeset/2" do
Expand All @@ -51,7 +62,7 @@ defmodule CodeCorps.StripeEventTest do
test "prevents :status from being invalid" do
event = insert(:stripe_event)

changeset = StripeEvent.update_changeset(event, %{status: "invalid"})
changeset = StripeEvent.update_changeset(event, %{status: "random"})

refute changeset.valid?
assert changeset.errors[:status] == {"is invalid", []}
Expand Down
13 changes: 7 additions & 6 deletions test/support/factories.ex
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,6 @@ defmodule CodeCorps.Factories do
}
end

def stripe_file_upload_factory do
%CodeCorps.StripeFileUpload{
id_from_stripe: sequence(:id_from_stripe, &"stripe_id_#{&1}"),
}
end

def stripe_connect_subscription_factory do
stripe_connect_plan = build(:stripe_connect_plan)
%CodeCorps.StripeConnectSubscription{
Expand All @@ -160,12 +154,19 @@ defmodule CodeCorps.Factories do

def stripe_event_factory do
%CodeCorps.StripeEvent{
endpoint: sequence(:endpoint, fn(_) -> Enum.random(~w{ connect platform }) end),
id_from_stripe: sequence(:id_from_stripe, &"stripe_id_#{&1}"),
status: sequence(:status, fn(_) -> Enum.random(~w{ unprocessed processed errored }) end),
type: "test.type"
}
end

def stripe_file_upload_factory do
%CodeCorps.StripeFileUpload{
id_from_stripe: sequence(:id_from_stripe, &"stripe_id_#{&1}"),
}
end

def stripe_platform_customer_factory do
%CodeCorps.StripePlatformCustomer{
created: Timex.now,
Expand Down
2 changes: 1 addition & 1 deletion web/controllers/stripe_connect_events_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ defmodule CodeCorps.StripeConnectEventsController do

def create(conn, params) do
case WebhookProcessor.process_async(params, ConnectEventHandler) do
{:ok, :ignored_by_environment} -> conn |> send_resp(400, "")
{:ok, _pid} -> conn |> send_resp(200, "")
{:error, :ignored_by_environment} -> conn |> send_resp(400, "")
end
end
end
2 changes: 1 addition & 1 deletion web/controllers/stripe_platform_events_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ defmodule CodeCorps.StripePlatformEventsController do

def create(conn, params) do
case WebhookProcessor.process_async(params, PlatformEventHandler) do
{:ok, :ignored_by_environment} -> conn |> send_resp(400, "")
{:ok, _pid} -> conn |> send_resp(200, "")
{:error, :ignored_by_environment} -> conn |> send_resp(400, "")
end
end
end
15 changes: 11 additions & 4 deletions web/models/stripe_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule CodeCorps.StripeEvent do
## Fields
* `endpoint` - "connect" or "platform"
* `id_from_stripe` - Stripe's `id`
* `status` - "unprocessed", "processed", or "errored"
Expand All @@ -21,24 +22,26 @@ defmodule CodeCorps.StripeEvent do
use CodeCorps.Web, :model

schema "stripe_events" do
field :endpoint, :string, null: false
field :id_from_stripe, :string, null: false
field :status, :string, default: "unprocessed"
field :type, :string, null: false
field :user_id, :string

timestamps()
end

@doc """
Builds a changeset for storing a new event reference into the database.
Accepts `:id_from_stripe` only. The `status` field is set to "unprocessed"
by default.
The `status` field is set to "unprocessed" by default.
"""
def create_changeset(struct, params \\ %{}) do
struct
|> cast(params, [:id_from_stripe, :type])
|> validate_required([:id_from_stripe, :type])
|> cast(params, [:endpoint, :id_from_stripe, :type, :user_id])
|> validate_required([:endpoint, :id_from_stripe, :type])
|> put_change(:status, "processing")
|> validate_inclusion(:status, states)
|> validate_inclusion(:endpoint, endpoints)
|> unique_constraint(:id_from_stripe)
end

Expand All @@ -54,6 +57,10 @@ defmodule CodeCorps.StripeEvent do
|> validate_inclusion(:status, states)
end

defp endpoints do
~w{ connect platform }
end

defp states do
~w{ errored processed processing unhandled unprocessed }
end
Expand Down

0 comments on commit 091cfb0

Please sign in to comment.