Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the example app to work with latest Broadway and Broadway SQS #56

Merged
merged 9 commits into from Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 29 additions & 1 deletion .github/workflows/ci.yml
Expand Up @@ -26,8 +26,12 @@ jobs:
- elixir: 1.10.4
otp: 21.3.8.16
- elixir: 1.10.4
otp: 23.0.3
otp: 23.2.1
check_formatted: true
- elixir: 1.11.3
otp: 23.2.1
check_formatted: true
warnings_as_errors: true
steps:
- uses: actions/checkout@v2.3.2
- uses: actions/setup-elixir@v1.5.0
Expand All @@ -44,3 +48,27 @@ jobs:
- run: mix compile --warnings-as-errors
if: matrix.warnings_as_errors
- run: mix test
example_app_test:
name: example app test (Elixir 1.11.3 | Erlang/OTP 23.2.1)
runs-on: ubuntu-latest
env:
MIX_ENV: test
strategy:
fail-fast: false
defaults:
run:
working-directory: examples/sqs_example
steps:
- uses: actions/checkout@v2.3.2
- uses: actions/setup-elixir@v1.5.0
with:
otp-version: 23.2.1
elixir-version: 1.11.3
- run: mix format --check-formatted
- name: Install Dependencies
run: |
mix local.hex --force
mix local.rebar --force
mix deps.get --only test
- run: mix compile --warnings_as_errors
- run: mix test
35 changes: 8 additions & 27 deletions examples/sqs_example/config/config.exs
@@ -1,34 +1,15 @@
# This file is responsible for configuring your application
# and its dependencies with the aid of the Mix.Config module.
use Mix.Config

# This configuration is loaded before any dependency and is restricted
# to this project. If another project depends on this project, this
# file won't be loaded nor affect the parent project. For this reason,
# if you want to provide default values for your application for
# 3rd-party users, it should be done in your "mix.exs" file.

# You can configure your application as:
#

config :broadway_sqs_example,
producer_module:
{BroadwaySQS.Producer,
sqs_client: BroadwaySQS.ExAwsClient,
config: [
# access_key_id: "YOUR_AWS_ACCESS_KEY_ID",
# secret_access_key: "YOUR_AWS_SECRET_ACCESS_KEY"
region: "us-east-2"
]},
int_queue: "TEST-int-queue",
string_queue: "TEST-string-queue"

#
# and access this configuration in your application as:
#
# Application.get_env(:broadway_sqs_example, :key)
#
# You can also configure a 3rd-party app:
#
# config :logger, level: :info
#

# It is also possible to import configuration files, relative to this
# directory. For example, you can emulate configuration per environment
# by uncommenting the line below and defining dev.exs, test.exs and such.
# Configuration from the imported file will override the ones defined
# here (which is why it is important to import them last).
#
import_config "#{Mix.env()}.exs"
2 changes: 1 addition & 1 deletion examples/sqs_example/config/test.exs
@@ -1,3 +1,3 @@
use Mix.Config

config :broadway_sqs_example, :broadway_sqs_implementation, BroadwaySQS.TestClient
config :broadway_sqs_example, producer_module: {Broadway.DummyProducer, []}
18 changes: 0 additions & 18 deletions examples/sqs_example/lib/broadway_sqs_test_client.ex

This file was deleted.

23 changes: 5 additions & 18 deletions examples/sqs_example/lib/int_squared_broadway.ex
Expand Up @@ -3,27 +3,14 @@ defmodule BroadwaySQSExample.IntSquared do

alias Broadway.Message

@broadway_sqs_implementation Application.get_env(
:broadway_sqs_example,
:broadway_sqs_implementation,
BroadwaySQS.ExAwsClient
)

def start_link(_opts) do
{module, opts} = Application.get_env(:broadway_sqs_example, :producer_module)
opts = opts ++ [queue_url: Application.get_env(:broadway_sqs_example, :int_queue)]

Broadway.start_link(__MODULE__,
name: __MODULE__,
producers: [
default: [
module:
{BroadwaySQS.Producer,
sqs_client: @broadway_sqs_implementation,
queue_url: Application.get_env(:broadway_sqs_example, :int_queue),
config: [
# access_key_id: "YOUR_AWS_ACCESS_KEY_ID",
# secret_access_key: "YOUR_AWS_SECRET_ACCESS_KEY"
region: "us-east-2"
]}
]
producer: [
module: {module, opts}
],
processors: [
default: []
Expand Down
25 changes: 6 additions & 19 deletions examples/sqs_example/lib/string_broadway.ex
Expand Up @@ -3,30 +3,17 @@ defmodule BroadwaySQSExample.String do

alias Broadway.Message

@broadway_sqs_implementation Application.get_env(
:broadway_sqs_example,
:broadway_sqs_implementation,
BroadwaySQS.ExAwsClient
)

def start_link(_opts) do
{module, opts} = Application.get_env(:broadway_sqs_example, :producer_module)
opts = opts ++ [queue_url: Application.get_env(:broadway_sqs_example, :string_queue)]

Broadway.start_link(__MODULE__,
name: __MODULE__,
producers: [
default: [
module:
{BroadwaySQS.Producer,
sqs_client: @broadway_sqs_implementation,
queue_url: Application.get_env(:broadway_sqs_example, :string_queue),
config: [
# access_key_id: "YOUR_AWS_ACCESS_KEY_ID",
# secret_access_key: "YOUR_AWS_SECRET_ACCESS_KEY"
region: "us-east-2"
]}
]
producer: [
module: {module, opts}
],
processors: [
default: []
default: [concurrency: 1]
],
batchers: [
default: [
Expand Down
30 changes: 16 additions & 14 deletions examples/sqs_example/mix.lock
@@ -1,17 +1,19 @@
%{
"broadway": {:hex, :broadway, "0.4.0", "b8daf580baed44347a9690449f9d8d7a308c1ca086648397d1a88eea893d047e", [:mix], [{:gen_stage, "~> 0.14", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm"},
"broadway": {:hex, :broadway, "0.6.2", "ef8e0d257420c72f0e600958cf95556835d9921ad14be333493083226458791a", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f4f93704304a736c984cd6ed884f697415f68eb50906f4dc5d641926366ad8fa"},
"broadway_sqs": {:hex, :broadway_sqs, "0.1.0", "dd9d2d404ccbca9252fbbad54551bb73d685c1c37103e37adfa21267da888fe5", [:mix], [{:broadway, "~> 0.1", [hex: :broadway, repo: "hexpm", optional: false]}, {:ex_aws_sqs, "~> 2.0", [hex: :ex_aws_sqs, repo: "hexpm", optional: false]}, {:sweet_xml, "~> 0.6", [hex: :sweet_xml, repo: "hexpm", optional: false]}], "hexpm"},
"certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"},
"ex_aws": {:hex, :ex_aws, "2.1.1", "1e4de2106cfbf4e837de41be41cd15813eabc722315e388f0d6bb3732cec47cd", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "1.6.3 or 1.6.5 or 1.7.1 or 1.8.6 or ~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8", [hex: :jsx, repo: "hexpm", optional: true]}, {:poison, ">= 1.2.0", [hex: :poison, repo: "hexpm", optional: true]}, {:sweet_xml, "~> 0.6", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm"},
"ex_aws_sqs": {:hex, :ex_aws_sqs, "3.0.0", "a1e3e04ac0b0e25008d965e742b0800a7029914f6f485d7022f3aaf34d052b5b", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}], "hexpm"},
"gen_stage": {:hex, :gen_stage, "0.14.2", "6a2a578a510c5bfca8a45e6b27552f613b41cf584b58210f017088d3d17d0b14", [:mix], [], "hexpm"},
"hackney": {:hex, :hackney, "1.15.1", "9f8f471c844b8ce395f7b6d8398139e26ddca9ebc171a8b91342ee15a19963f4", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.4", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"httpoison": {:hex, :httpoison, "0.13.0", "bfaf44d9f133a6599886720f3937a7699466d23bb0cd7a88b6ba011f53c6f562", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm"},
"sweet_xml": {:hex, :sweet_xml, "0.6.6", "fc3e91ec5dd7c787b6195757fbcf0abc670cee1e4172687b45183032221b66b8", [:mix], [], "hexpm"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm"},
"certifi": {:hex, :certifi, "2.5.3", "70bdd7e7188c804f3a30ee0e7c99655bc35d8ac41c23e12325f36ab449b70651", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "ed516acb3929b101208a9d700062d520f3953da3b6b918d866106ffa980e1c10"},
"ex_aws": {:hex, :ex_aws, "2.1.6", "41ab8b4caa48035c96d07faa035d2d9de6df480e7e084c054e662ac888dcd4d4", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8", [hex: :jsx, repo: "hexpm", optional: true]}, {:sweet_xml, "~> 0.6", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "a541bd042c1ee26412bb1e749ddf2a1c327e4fb7e382b1cd227e1b00eed3d469"},
"ex_aws_sqs": {:hex, :ex_aws_sqs, "3.2.1", "fc6772b1cd894a73494498f73820f4171e88f48dadcb64c632d1413fb4592cdb", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:saxy, "~> 1.1", [hex: :saxy, repo: "hexpm", optional: true]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "ae77e296dffc0608221f14287cea5621b4419b94794804fd20bbf6cf8c71561e"},
"gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"},
"hackney": {:hex, :hackney, "1.17.0", "717ea195fd2f898d9fe9f1ce0afcc2621a41ecfe137fae57e7fe6e9484b9aa99", [:rebar3], [{:certifi, "~>2.5", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "64c22225f1ea8855f584720c0e5b3cd14095703af1c9fbc845ba042811dc671c"},
"httpoison": {:hex, :httpoison, "0.13.0", "bfaf44d9f133a6599886720f3937a7699466d23bb0cd7a88b6ba011f53c6f562", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "4846958172d6401c4f34ecc5c2c4607b5b0d90b8eec8f6df137ca4907942ed0f"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"nimble_options": {:hex, :nimble_options, "0.3.5", "a4f6820cdcb4ee444afd78635f323e58e8a5ddf2fbbe9b9d283a99f972034bae", [:mix], [], "hexpm", "f5507cc90033a8d12769522009c80aa9164af6bab245dbd4ad421d008455f1e1"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"saxy": {:hex, :saxy, "1.3.0", "61b52697a3235be68ce6f8ecc2c7032f3c01184b14d142a7d09270019e32fbf9", [:mix], [], "hexpm", "c9770a08c168be95c8d8249a9051dd5522641941ec1d9cb843e7d12dc101b6d2"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
}
8 changes: 4 additions & 4 deletions examples/sqs_example/test/int_queue_test.exs
Expand Up @@ -3,15 +3,15 @@ defmodule IntQueueTest do
doctest BroadwaySQSExample.IntSquared

test "Acks work" do
ref = Broadway.test_messages(BroadwaySQSExample.IntSquared, ["1", "2", "3"])
assert_receive {:ack, ^ref, successful, failed}, 5000
ref = Broadway.test_batch(BroadwaySQSExample.IntSquared, ["1", "2", "3"])
assert_receive {:ack, ^ref, successful, failed}, 3_000
assert length(successful) == 3
assert length(failed) == 0
end

test "squares numbers" do
ref = Broadway.test_messages(BroadwaySQSExample.IntSquared, ["1", "2", "3"])
assert_receive {:ack, ^ref, successful, failed}, 5000
ref = Broadway.test_batch(BroadwaySQSExample.IntSquared, ["1", "2", "3"])
assert_receive {:ack, ^ref, successful, failed}, 3_000
assert length(successful) == 3
assert length(failed) == 0

Expand Down
9 changes: 5 additions & 4 deletions examples/sqs_example/test/string_queue_test.exs
Expand Up @@ -3,15 +3,16 @@ defmodule StringQueueTest do
doctest BroadwaySQSExample.String

test "Acks work" do
ref = Broadway.test_messages(BroadwaySQSExample.String, ["1", "2", "3"])
assert_receive {:ack, ^ref, successful, failed}, 5000
ref = Broadway.test_batch(BroadwaySQSExample.String, ["1", "2", "3"])

assert_receive {:ack, ^ref, successful, failed}, 3_000
assert length(successful) == 3
assert length(failed) == 0
end

test "combines the same string twice" do
ref = Broadway.test_messages(BroadwaySQSExample.String, ["1", "2", "3"])
assert_receive {:ack, ^ref, successful, failed}, 5000
ref = Broadway.test_batch(BroadwaySQSExample.String, ["1", "2", "3"])
assert_receive {:ack, ^ref, successful, failed}, 3_000
assert length(successful) == 3
assert length(failed) == 0

Expand Down
4 changes: 2 additions & 2 deletions lib/broadway_sqs/producer.ex
Expand Up @@ -16,7 +16,7 @@ defmodule BroadwaySQS.Producer do
producers (regardless of the client implementation), all other options are specific to
the `BroadwaySQS.ExAwsClient`, which is the default client.

#{NimbleOptions.Docs.generate(BroadwaySQS.Options.definition())}
#{NimbleOptions.docs(BroadwaySQS.Options.definition())}

## Acknowledgments

Expand Down Expand Up @@ -148,7 +148,7 @@ defmodule BroadwaySQS.Producer do
end

@impl true
def prepare_for_start(module, broadway_opts) do
def prepare_for_start(_module, broadway_opts) do
{producer_module, client_opts} = broadway_opts[:producer][:module]

if Keyword.has_key?(client_opts, :queue_name) do
Expand Down