Skip to content
This repository has been archived by the owner on Mar 27, 2023. It is now read-only.

Commit

Permalink
Updated the SQS client to use FIFO queues
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrei committed Jul 27, 2017
1 parent 3ff59df commit 18dec1c
Show file tree
Hide file tree
Showing 34 changed files with 327 additions and 191 deletions.
4 changes: 2 additions & 2 deletions app/controllers/api/stateless/members_controller.rb
Expand Up @@ -51,7 +51,7 @@ def permitted_params
end

def update_on_ak(member)
ChampaignQueue.push(
ChampaignQueue.push({
type: 'update_member',
params: {
akid: member.actionkit_user_id,
Expand All @@ -64,7 +64,7 @@ def update_on_ak(member)
address1: member.address1,
address2: member.address2
}
)
}, { group_id: "member:#{member.id}" })
end
end
end
Expand Down
10 changes: 6 additions & 4 deletions app/lib/champaign_queue/clients/sqs.rb
Expand Up @@ -6,22 +6,24 @@ class Sqs
class << self
# +params+ - The message to send. String maximum 256 KB in size.
# +delay+ - The number of seconds (0 to 900 - 15 minutes) to delay a specific message.
def push(params, delay: 0)
new(params, delay).push
def push(params, group_id:, delay: 0)
new(params, group_id, delay).push
end
end

def initialize(params, delay)
def initialize(params, group_id, delay)
@params = params
@delay = delay
@group_id = group_id
end

def push
return false if queue_url.blank?

client.send_message(queue_url: queue_url,
message_body: @params.to_json,
delay_seconds: @delay)
delay_seconds: @delay,
message_group_id: @group_id)
end

private
Expand Down
11 changes: 8 additions & 3 deletions app/models/member.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true

# == Schema Information
#
# Table name: members
Expand Down Expand Up @@ -27,15 +28,15 @@ class Member < ApplicationRecord
has_one :authentication, class_name: 'MemberAuthentication', dependent: :destroy
has_many :payment_methods, through: :customer
has_many :actions
has_paper_trail on: [:update, :destroy]
has_paper_trail on: %i[update destroy]

delegate :authenticate, to: :authentication, allow_nil: true

validates :email, uniqueness: { case_sensitive: true }, allow_nil: true

before_validation { email.try(:downcase!) }

enum donor_status: [:nondonor, :donor, :recurring_donor]
enum donor_status: %i[nondonor donor recurring_donor]

def self.find_from_request(akid: nil, id: nil)
member = find_by_akid(akid)
Expand Down Expand Up @@ -84,7 +85,11 @@ def publish_signup(locale = nil)
postal: postal
}
params[:locale] = locale if locale.present?
ChampaignQueue.push(type: 'subscribe_member', params: params)
ChampaignQueue.push(
{ type: 'subscribe_member',
params: params },
{ group_id: "member:#{id}" }
)
end

def token_payload
Expand Down
27 changes: 17 additions & 10 deletions app/models/payment/braintree/subscription.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true

# == Schema Information
#
# Table name: payment_braintree_subscriptions
Expand Down Expand Up @@ -32,18 +33,24 @@ class Payment::Braintree::Subscription < ApplicationRecord

def publish_cancellation(reason)
# reason can be "user", "admin", "processor", "failure", "expired"
ChampaignQueue.push(type: 'cancel_subscription',
params: {
recurring_id: subscription_id,
canceled_by: reason
})
ChampaignQueue.push(
{ type: 'cancel_subscription',
params: {
recurring_id: subscription_id,
canceled_by: reason
} },
{ group_id: "braintree-subscription:#{id}" }
)
end

def publish_amount_update
ChampaignQueue.push(type: 'recurring_payment_update',
params: {
recurring_id: subscription_id,
amount: amount.to_s
})
ChampaignQueue.push(
{ type: 'recurring_payment_update',
params: {
recurring_id: subscription_id,
amount: amount.to_s
} },
{ group_id: "braintree-subscription:#{id}" }
)
end
end
7 changes: 5 additions & 2 deletions app/models/payment/braintree/transaction.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true

# == Schema Information
#
# Table name: payment_braintree_transactions
Expand Down Expand Up @@ -27,7 +28,7 @@ class Payment::Braintree::Transaction < ApplicationRecord
belongs_to :payment_method, class_name: 'Payment::Braintree::PaymentMethod'
belongs_to :customer, class_name: 'Payment::Braintree::Customer', primary_key: 'customer_id'
belongs_to :subscription, class_name: 'Payment::Braintree::Subscription'
enum status: [:success, :failure]
enum status: %i[success failure]

scope :one_off, -> { where(subscription_id: nil) }

Expand All @@ -41,6 +42,8 @@ def publish_subscription_charge
status: status == 'success' ? 'completed' : 'failed',
amount: amount.to_s
}
}, { delay: 120 })
},
{ delay: 120,
group_id: "braintree-subscription:#{subscription.id}" })
end
end
21 changes: 13 additions & 8 deletions app/models/payment/go_cardless/subscription.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true

# == Schema Information
#
# Table name: payment_go_cardless_subscriptions
Expand Down Expand Up @@ -43,12 +44,13 @@ def call
subscription: subscription
)

ChampaignQueue.push(
ChampaignQueue.push({
type: 'subscription-payment',
params: {
recurring_id: @subscription.go_cardless_id
}
)
},
{ group_id: "gocardless-subscription:#{subscription.id}" })
end
end
end
Expand Down Expand Up @@ -87,7 +89,7 @@ class Payment::GoCardless::Subscription < ApplicationRecord
end

event :run_approve do
transitions from: [:pending, :created], to: :active
transitions from: %i[pending created], to: :active
end

event :run_cancel do
Expand All @@ -109,10 +111,13 @@ class Payment::GoCardless::Subscription < ApplicationRecord

def publish_cancellation(reason)
# reason can be "user", "admin", "processor", "failure", "expired"
ChampaignQueue.push(type: 'cancel_subscription',
params: {
recurring_id: go_cardless_id,
canceled_by: reason
})
ChampaignQueue.push(
{ type: 'cancel_subscription',
params: {
recurring_id: go_cardless_id,
canceled_by: reason
} },
{ group_id: "gocardless-subscription:#{id}" }
)
end
end
9 changes: 6 additions & 3 deletions app/models/payment/go_cardless/transaction.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true

# == Schema Information
#
# Table name: payment_go_cardless_transactions
Expand Down Expand Up @@ -56,11 +57,11 @@ class Payment::GoCardless::Transaction < ApplicationRecord
end

event :run_confirm do
transitions from: [:created, :submitted], to: :confirmed
transitions from: %i[created submitted], to: :confirmed
end

event :run_payout do
transitions from: [:created, :submitted, :confirmed], to: :paid_out
transitions from: %i[created submitted confirmed], to: :paid_out
end

event :run_cancel do
Expand All @@ -86,6 +87,8 @@ def publish_failed_subscription_charge
success: 0,
status: 'failed'
}
}, { delay: 120 })
},
{ delay: 120,
group_id: "gocardless-subscription:#{id}" })
end
end
2 changes: 1 addition & 1 deletion app/services/action_queue.rb
Expand Up @@ -11,7 +11,7 @@ def initialize(action)
end

def push
ChampaignQueue.push(payload.merge(meta))
ChampaignQueue.push(payload.merge(meta), group_id: "action:#{@action.id}")
end

def page
Expand Down
2 changes: 1 addition & 1 deletion app/services/call_event.rb
Expand Up @@ -11,7 +11,7 @@ def initialize(call, extra_params = {})
end

def publish
ChampaignQueue.push(payload)
ChampaignQueue.push(payload, group_id: "call:#{@call.id}")
end

private
Expand Down
10 changes: 7 additions & 3 deletions app/services/campaign_creator.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true

class CampaignCreator
def self.run(params)
new(params).run
Expand All @@ -17,8 +18,11 @@ def run
private

def publish_event
ChampaignQueue.push(type: 'create_campaign',
name: @campaign.name,
campaign_id: @campaign.id)
ChampaignQueue.push(
{ type: 'create_campaign',
name: @campaign.name,
campaign_id: @campaign.id },
{ group_id: "campaign:#{@campaign.id}" }
)
end
end
10 changes: 7 additions & 3 deletions app/services/campaign_updater.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true

class CampaignUpdater
def self.run(campaign, params)
new(campaign, params).run
Expand All @@ -18,8 +19,11 @@ def run
private

def publish_event
ChampaignQueue.push(type: 'update_campaign',
name: @campaign.name,
campaign_id: @campaign.id)
ChampaignQueue.push(
{ type: 'update_campaign',
name: @campaign.name,
campaign_id: @campaign.id },
{ group_id: "campaign:#{@campaign.id}" }
)
end
end
4 changes: 3 additions & 1 deletion app/services/queue_manager.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true

class QueueManager
include Rails.application.routes.url_helpers

Expand Down Expand Up @@ -31,7 +32,8 @@ def push_to_queue

def to_queue(additions = {})
ChampaignQueue.push(
data_for_queue.merge(additions)
data_for_queue.merge(additions),
group_id: "page:#{@page.id}"
)
end

Expand Down
7 changes: 5 additions & 2 deletions spec/features/express_donations/email_one_click_spec.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true

require 'rails_helper'
require_relative 'shared_methods'

Expand Down Expand Up @@ -71,7 +72,8 @@
expect(customer.transactions.count).to eq(1)

@is_authenticated = 1
expect(ChampaignQueue).to receive(:push).with(queue_payload)
expect(ChampaignQueue).to receive(:push)
.with(queue_payload, group_id: /action:\d+/)

VCR.use_cassette('feature_member_email_donation') do
visit page_path(donation_page, amount: '2.10', currency: 'GBP', akid: valid_akid, one_click: true)
Expand All @@ -91,7 +93,8 @@
expect(Action.count).to eq(1)

@is_authenticated = 0
expect(ChampaignQueue).to receive(:push).with(queue_payload)
expect(ChampaignQueue).to receive(:push)
.with(queue_payload, group_id: /action:\d+/)

VCR.use_cassette('feature_one_click_cookie') do
visit page_path(donation_page, amount: '2.10', currency: 'GBP', akid: valid_akid, one_click: true)
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/champaign_queue/clients/sqs_spec.rb
Expand Up @@ -17,7 +17,7 @@
</SendMessageResponse>)
end

let(:request_body) { 'Action=SendMessage&DelaySeconds=0&MessageBody=%7B%22foo%22%3A%22bar%22%7D&QueueUrl=https%3A%2F%2Fsqs.us-east-1.amazonaws.com%2F679051310897%2Fdemo&Version=2012-11-05' }
let(:request_body) { 'Action=SendMessage&DelaySeconds=0&MessageBody=%7B%22foo%22%3A%22bar%22%7D&MessageGroupId=abc&QueueUrl=https%3A%2F%2Fsqs.us-east-1.amazonaws.com%2F679051310897%2Fdemo&Version=2012-11-05' }
let(:request_uri) { 'https://sqs.us-east-1.amazonaws.com/679051310897/demo' }

before do
Expand All @@ -33,7 +33,7 @@

it 'delivers payload to AWS SQS Queue' do
Timecop.freeze('2015/01/01') do
resp = ChampaignQueue::Clients::Sqs.push(foo: :bar)
resp = ChampaignQueue::Clients::Sqs.push({ foo: :bar }, { group_id: 'abc' })

expect(resp.message_id).to eq('918aba5a-b70f-4e31-9905-ba02000fcdaa')
end
Expand All @@ -47,7 +47,7 @@

it 'does not deliver payload to AWS SQS Queue' do
expect_any_instance_of(Aws::SQS::Client).to_not receive(:send_message)
ChampaignQueue::Clients::Sqs.push(foo: :bar)
ChampaignQueue::Clients::Sqs.push({ foo: :bar }, { group_id: 'abc' })
end
end
end
6 changes: 3 additions & 3 deletions spec/lib/champaign_queue_spec.rb
Expand Up @@ -11,9 +11,9 @@

it 'delegates to Client::Sqs' do
expect(ChampaignQueue::Clients::Sqs)
.to receive(:push).with({ foo: 'bar' }, {})
.to receive(:push).with({ foo: 'bar' }, { group_id: 'bla' })

ChampaignQueue.push(foo: 'bar')
ChampaignQueue.push({ foo: 'bar' }, { group_id: 'bla' })
end
end

Expand All @@ -22,7 +22,7 @@
expect(ChampaignQueue::Clients::Sqs)
.to_not receive(:push)

ChampaignQueue.push(foo: 'bar')
ChampaignQueue.push(foo: 'bar', group_id: 'bla')
end
end
end
Expand Down

0 comments on commit 18dec1c

Please sign in to comment.