Skip to content

Commit 34dccb5

Browse files
committed
Code for step 8
1 parent d8d0fd8 commit 34dccb5

2 files changed

Lines changed: 107 additions & 1 deletion

File tree

lib/elixir_popularity/application.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ defmodule ElixirPopularity.Application do
2828
]},
2929
restart: :transient
3030
},
31-
ElixirPopularity.HackernewsIdProcessor
31+
ElixirPopularity.HackernewsIdProcessor,
32+
ElixirPopularity.HackernewsPayloadProcessor
3233
]
3334

3435
# See https://hexdocs.pm/elixir/Supervisor.html
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
defmodule ElixirPopularity.HackernewsPayloadProcessor do
2+
use Broadway
3+
4+
alias Broadway.Message
5+
alias ElixirPopularity.{HNStats, Repo}
6+
7+
def start_link(_opts) do
8+
Broadway.start_link(__MODULE__,
9+
name: __MODULE__,
10+
producers: [
11+
default: [
12+
module:
13+
{BroadwayRabbitMQ.Producer,
14+
queue: ElixirPopularity.RMQPublisher.bulk_item_data_queue_name(),
15+
connection: [
16+
username: "rabbitmq",
17+
password: "rabbitmq"
18+
]},
19+
stages: 1
20+
]
21+
],
22+
processors: [
23+
default: [
24+
stages: 20
25+
]
26+
]
27+
)
28+
end
29+
30+
def handle_message(_processor, %Message{data: data} = message, _context) do
31+
data
32+
|> Jason.decode!()
33+
|> Enum.map(fn entry ->
34+
summarize_data(entry)
35+
end)
36+
|> Enum.filter(fn summary ->
37+
summary.language_present
38+
|> Map.values()
39+
|> Enum.member?(true)
40+
end)
41+
|> Enum.each(fn entry ->
42+
date = entry.date
43+
hn_id = entry.id
44+
type = entry.type
45+
46+
Enum.each(entry.language_present, fn
47+
{lang, true} ->
48+
%HNStats{}
49+
|> HNStats.changeset(%{
50+
item_id: hn_id,
51+
item_type: type,
52+
language: Atom.to_string(lang),
53+
date: date,
54+
occurances: 1
55+
})
56+
|> Repo.insert()
57+
58+
{_lang, false} ->
59+
nil
60+
end)
61+
end)
62+
63+
message
64+
end
65+
66+
defp summarize_data(hn_item) do
67+
%{
68+
id: hn_item["id"],
69+
date: hn_item["item"]["time"],
70+
type: hn_item["item"]["type"],
71+
language_present: language_check(hn_item["item"])
72+
}
73+
end
74+
75+
defp language_check(%{"type" => "story", "text" => text}) when not is_nil(text) do
76+
process_text(text)
77+
end
78+
79+
defp language_check(%{"type" => "story", "title" => text}) when not is_nil(text) do
80+
process_text(text)
81+
end
82+
83+
defp language_check(%{"type" => "comment", "text" => text}) when not is_nil(text) do
84+
process_text(text)
85+
end
86+
87+
defp language_check(%{"type" => "job", "text" => text}) when not is_nil(text) do
88+
process_text(text)
89+
end
90+
91+
defp process_text(text) do
92+
[elixir: ~r/elixir/i, erlang: ~r/erlang/i]
93+
|> Enum.map(fn {lang, regex} ->
94+
{lang, String.match?(text, regex)}
95+
end)
96+
|> Map.new()
97+
end
98+
99+
defp language_check(_item) do
100+
%{
101+
elixir: 0,
102+
erlang: 0
103+
}
104+
end
105+
end

0 commit comments

Comments
 (0)