Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ config :algora, Oban,
notify_bounty: 1,
notify_tip_intent: 1,
notify_claim: 1,
transfers: 1,
activity_notifier: 1,
activity_mailer: 1
]
Expand Down
11 changes: 11 additions & 0 deletions lib/algora/admin/admin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ defmodule Algora.Admin do
@moduledoc false
import Ecto.Query

alias Algora.Accounts.User
alias Algora.Payments
alias Algora.Repo
alias Algora.Workspace
alias Algora.Workspace.Ticket
Expand Down Expand Up @@ -72,6 +74,15 @@ defmodule Algora.Admin do
|> Algora.Repo.update()
end

def setup_test_account(user_handle) do
with account_id when is_binary(account_id) <- Algora.config([:stripe, :test_account_id]),
{:ok, user} <- Repo.fetch_by(User, handle: user_handle),
{:ok, acct} <- Payments.create_account(user, "US"),
{:ok, stripe_acct} <- Stripe.Account.retrieve(account_id, []) do
Payments.update_account(acct, stripe_acct)
end
end

defp update_tickets(url, repo_id) do
Repo.update_all(from(t in Ticket, where: fragment("?->>'repository_url' = ?", t.provider_meta, ^url)),
set: [repository_id: repo_id]
Expand Down
2 changes: 1 addition & 1 deletion lib/algora/bounties/bounties.ex
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ defmodule Algora.Bounties do
|> Enum.map(&LineItem.to_stripe/1)
|> Payments.create_stripe_session(%{
description: description,
metadata: %{"version" => "2", "group_id" => tx_group_id}
metadata: %{"version" => Payments.metadata_version(), "group_id" => tx_group_id}
}) do
{:ok, session.url}
end
Expand Down
13 changes: 13 additions & 0 deletions lib/algora/payments/jobs/execute_pending_transfers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Algora.Payments.Jobs.ExecutePendingTransfers do
@moduledoc false
use Oban.Worker,
queue: :transfers,
max_attempts: 1

alias Algora.Payments

@impl Oban.Worker
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
Payments.execute_pending_transfers(user_id)
end
end
101 changes: 101 additions & 0 deletions lib/algora/payments/payments.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
defmodule Algora.Payments do
@moduledoc false
import Ecto.Changeset
import Ecto.Query

alias Algora.Accounts
alias Algora.Accounts.User
alias Algora.MoneyUtils
alias Algora.Payments.Account
alias Algora.Payments.Customer
alias Algora.Payments.PaymentMethod
Expand All @@ -14,6 +16,8 @@ defmodule Algora.Payments do

require Logger

def metadata_version, do: "2"

def broadcast do
Phoenix.PubSub.broadcast(Algora.PubSub, "payments:all", :payments_updated)
end
Expand Down Expand Up @@ -248,4 +252,101 @@ defmodule Algora.Payments do
Repo.delete(account)
end
end

def execute_pending_transfers(user_id) do
pending_amount = get_pending_amount(user_id)

with {:ok, account} <- Repo.fetch_by(Account, user_id: user_id, provider: "stripe", payouts_enabled: true),
true <- Money.positive?(pending_amount) do
initialize_and_execute_transfer(user_id, pending_amount, account)
else
_ -> {:ok, nil}
end
end

defp get_pending_amount(user_id) do
total_credits =
Repo.one(
from(t in Transaction,
where: t.user_id == ^user_id,
where: t.type == :credit,
where: t.status == :succeeded,
select: sum(t.net_amount)
)
) || Money.zero(:USD)

total_transfers =
Repo.one(
from(t in Transaction,
where: t.user_id == ^user_id,
where: t.type == :transfer,
where: t.status == :succeeded or t.status == :processing or t.status == :initialized,
select: sum(t.net_amount)
)
) || Money.zero(:USD)

Money.sub!(total_credits, total_transfers)
end

defp initialize_and_execute_transfer(user_id, pending_amount, account) do
with {:ok, transaction} <- initialize_transfer(user_id, pending_amount),
{:ok, transfer} <- execute_transfer(transaction, account) do
broadcast()
{:ok, transfer}
else
error ->
Logger.error("Failed to execute transfer: #{inspect(error)}")
error
end
end

defp initialize_transfer(user_id, pending_amount) do
%Transaction{}
|> change(%{
id: Nanoid.generate(),
provider: "stripe",
type: :transfer,
status: :initialized,
user_id: user_id,
gross_amount: pending_amount,
net_amount: pending_amount,
total_fee: Money.zero(:USD)
})
|> Algora.Validations.validate_positive(:gross_amount)
|> Algora.Validations.validate_positive(:net_amount)
|> foreign_key_constraint(:user_id)
|> Repo.insert()
end

defp execute_transfer(transaction, account) do
# TODO: set other params
# TODO: provide idempotency key
case Algora.Stripe.create_transfer(%{
amount: MoneyUtils.to_minor_units(transaction.net_amount),
currency: MoneyUtils.to_stripe_currency(transaction.net_amount),
destination: account.provider_id,
metadata: %{"version" => metadata_version()}
}) do
{:ok, transfer} ->
# it's fine if this fails since we'll receive a webhook
transaction
|> change(%{
status: :succeeded,
succeeded_at: DateTime.utc_now(),
provider_id: transfer.id,
provider_meta: Util.normalize_struct(transfer)
})
|> Repo.update()

{:ok, transfer}

{:error, error} ->
# TODO: inconsistent state if this fails
transaction
|> change(%{status: :failed})
|> Repo.update()

{:error, error}
end
end
end
5 changes: 5 additions & 0 deletions lib/algora/shared/money_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ defmodule Algora.MoneyUtils do
amount_int
end

@spec to_stripe_currency(Money.t()) :: String.t()
def to_stripe_currency(money) do
money.currency |> to_string() |> String.downcase()
end

# TODO: Find a way to make this obsolete
# Why does ecto return {currency, amount} instead of Money.t()?
def ensure_money_field(struct, field) do
Expand Down
99 changes: 75 additions & 24 deletions lib/algora_web/controllers/webhooks/stripe_controller.ex
Original file line number Diff line number Diff line change
@@ -1,46 +1,83 @@
defmodule AlgoraWeb.Webhooks.StripeController do
@behaviour Stripe.WebhookHandler

import Ecto.Changeset
import Ecto.Query

alias Algora.Payments
alias Algora.Payments.Jobs.ExecutePendingTransfers
alias Algora.Payments.Transaction
alias Algora.Repo
alias Algora.Util

require Logger

@metadata_version Payments.metadata_version()

@impl true
def handle_event(%Stripe.Event{
type: "charge.succeeded",
data: %{object: %{metadata: %{"version" => "2", "group_id" => group_id}}}
data: %{object: %Stripe.Charge{metadata: %{"version" => @metadata_version, "group_id" => group_id}}}
})
when is_binary(group_id) do
{:ok, count} =
Repo.transact(fn ->
{count, _} =
Repo.update_all(from(t in Transaction, where: t.group_id == ^group_id),
set: [status: :succeeded, succeeded_at: DateTime.utc_now()]
)

# TODO: initiate pending transfers if any recipient has a payout account
# %{transfer_id: transfer_id, user_id: user_id}
# |> Algora.Workers.InitiateTransfer.new()
# |> Oban.insert()

{:ok, count}
end)

if count == 0 do
{:error, :no_transactions_found}
else
Payments.broadcast()
{:ok, nil}
end
Repo.transact(fn ->
update_result =
Repo.update_all(from(t in Transaction, where: t.group_id == ^group_id),
set: [status: :succeeded, succeeded_at: DateTime.utc_now()]
)

# TODO: split into two groups:
# - has active payout account -> execute pending transfers
# - has no active payout account -> notify user to connect payout account
jobs_result =
from(t in Transaction,
where: t.group_id == ^group_id,
where: t.type == :credit,
where: t.status == :succeeded
)
|> Repo.all()
|> Enum.map(fn %{user_id: user_id} -> user_id end)
|> Enum.uniq()
|> Enum.reduce_while(:ok, fn user_id, :ok ->
case %{user_id: user_id}
|> ExecutePendingTransfers.new()
|> Oban.insert() do
{:ok, _job} -> {:cont, :ok}
error -> {:halt, error}
end
end)

with {count, _} when count > 0 <- update_result,
:ok <- jobs_result do
Payments.broadcast()
{:ok, nil}
else
{:error, reason} ->
Logger.error("Failed to update transactions: #{inspect(reason)}")
{:error, :failed_to_update_transactions}

_error ->
Logger.error("Failed to update transactions")
{:error, :failed_to_update_transactions}
end
end)
end

@impl true
def handle_event(%Stripe.Event{type: "transfer.created"} = event) do
Logger.info("Stripe #{event.type} event: #{event.id}")
def handle_event(%Stripe.Event{
type: "transfer.created",
data: %{object: %Stripe.Transfer{metadata: %{"version" => @metadata_version}} = transfer}
}) do
with {:ok, transaction} <- Repo.fetch_by(Transaction, provider: "stripe", provider_id: transfer.id),
{:ok, _transaction} <- maybe_update_transaction(transaction, transfer) do
# TODO: notify user
Payments.broadcast()
{:ok, nil}
else
error ->
Logger.error("Failed to update transaction: #{inspect(error)}")
{:error, :failed_to_update_transaction}
end
end

@impl true
Expand All @@ -50,4 +87,18 @@ defmodule AlgoraWeb.Webhooks.StripeController do

@impl true
def handle_event(_event), do: :ok

defp maybe_update_transaction(transaction, transfer) do
if transaction.status == :succeeded do
{:ok, transaction}
else
transaction
|> change(%{
status: :succeeded,
succeeded_at: DateTime.utc_now(),
provider_meta: Util.normalize_struct(transfer)
})
|> Repo.update()
end
end
end
Loading
Loading