Skip to content

Commit

Permalink
Code for step 6
Browse files Browse the repository at this point in the history
  • Loading branch information
akoutmos committed Dec 5, 2019
1 parent ccfe4fa commit 2491d6f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
3 changes: 2 additions & 1 deletion lib/elixir_popularity/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ defmodule ElixirPopularity.Application do
}
]},
restart: :transient
}
},
ElixirPopularity.HackernewsIdProcessor
]

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
64 changes: 64 additions & 0 deletions lib/hackernews_id_processor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule ElixirPopularity.HackernewsIdProcessor do
use Broadway

alias Broadway.Message
alias ElixirPopularity.{HackernewsApi, RMQPublisher}

def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producers: [
default: [
module:
{BroadwayRabbitMQ.Producer,
queue: RMQPublisher.item_id_queue_name(),
connection: [
username: "rabbitmq",
password: "rabbitmq"
]},
stages: 1
]
],
processors: [
default: [
stages: 100
]
],
batchers: [
default: [
batch_size: 10,
batch_timeout: 10_000,
stages: 2
]
]
)
end

@impl true
def handle_message(_processor, message, _context) do
Message.update_data(message, fn hn_id ->
{hn_id, HackernewsApi.get_hn_item(hn_id)}
end)
end

@impl true
def handle_batch(_batcher, messages, _batch_info, _context) do
encoded_payload =
messages
|> Enum.reject(fn
%Message{data: {_id, :error}} -> true
_ -> false
end)
|> Enum.map(fn %Message{data: {id, item}} ->
%{
id: id,
item: Map.from_struct(item)
}
end)
|> Jason.encode!()

RMQPublisher.publish_hn_items(encoded_payload)

messages
end
end

0 comments on commit 2491d6f

Please sign in to comment.