Skip to content

Commit

Permalink
Simplify manager init function
Browse files Browse the repository at this point in the history
  • Loading branch information
oltarasenko committed Dec 21, 2020
1 parent 36d638c commit 2bdc360
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 45 deletions.
69 changes: 34 additions & 35 deletions lib/crawly/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule Crawly.Manager do
require Logger

@timeout 60_000
@start_request_split_size 50

use GenServer

Expand Down Expand Up @@ -66,6 +67,25 @@ defmodule Crawly.Manager do

Process.link(request_storage_pid)

# Add start requests to the requests storage
init = spider_name.init(options)

start_requests_from_req = Keyword.get(init, :start_requests, [])

start_requests_from_urls =
init
|> Keyword.get(:start_urls, [])
|> Crawly.Utils.requests_from_urls()

start_requests = start_requests_from_req ++ start_requests_from_urls

# Split start requests, so it's possible to initialize a part of them in async
# manner
{start_req, async_start_req} =
Enum.split(start_requests, @start_request_split_size)

:ok = store_requests(spider_name, start_req)

# Start workers
num_workers =
Utils.get_settings(:concurrent_requests_per_domain, spider_name, 4)
Expand Down Expand Up @@ -95,45 +115,15 @@ defmodule Crawly.Manager do
tref: tref,
prev_scraped_cnt: 0,
workers: worker_pids
}, {:continue, {:startup, options}}}
}, {:continue, {:startup, {spider_name, async_start_req}}}}
end

@impl true
def handle_continue({:startup, options}, state) do
# Getting spider start urls
init = state.name.init(options)
spider_name = state.name
# Store start urls
start_requests = Keyword.get(init, :start_requests, [])
{sync_start_req, async_start_req} = Enum.split(start_requests, 1000)

store_req_fun = fn
%Crawly.Request{} = request ->
Crawly.RequestsStorage.store(spider_name, request)

request ->
# We should not attempt to store something which is not a request
Logger.error(
"#{inspect(request)} does not seem to be a request. Ignoring."
)

:ignore
end

Enum.each(sync_start_req, store_req_fun)
Task.start_link(fn -> Enum.each(async_start_req, store_req_fun) end)

start_urls = Keyword.get(init, :start_urls, [])
{sync_start_urls, async_start_urls} = Enum.split(start_urls, 1000)

store_url_fun = fn url ->
Crawly.RequestsStorage.store(spider_name, Crawly.Request.new(url))
end

Enum.each(sync_start_urls, store_url_fun)
Task.start_link(fn -> Enum.each(async_start_urls, store_url_fun) end)
def handle_continue({:startup, {spider_name, async_start_req}}, state) do
Task.start(fn ->
store_requests(spider_name, async_start_req)
end)

# return state as unchanged
{:noreply, state}
end

Expand Down Expand Up @@ -225,4 +215,13 @@ defmodule Crawly.Manager do
do: String.to_integer(value)

defp maybe_convert_to_integer(value) when is_integer(value), do: value

defp store_requests(spider_name, requests) do
Enum.each(
requests,
fn request ->
Crawly.RequestsStorage.store(spider_name, request)
end
)
end
end
7 changes: 6 additions & 1 deletion lib/crawly/requests_storage/requests_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,15 @@ defmodule Crawly.RequestsStorage do
@spec store(spider_name, request) :: :ok
when spider_name: atom(),
request: Crawly.Request.t()
def store(spider_name, request) do
def store(spider_name, %Crawly.Request{} = request) do
store(spider_name, [request])
end

def store(_spider_name, request) do
Logger.error("#{inspect(request)} does not seem to be a request. Ignoring.")
{:error, :not_request}
end

@doc """
Pop a request out of requests storage
"""
Expand Down
4 changes: 2 additions & 2 deletions lib/crawly/requests_storage/requests_storage_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ defmodule Crawly.RequestsStorage.Worker do
GenServer.call(pid, command)
catch
error, reason ->
Logger.error("Could not fetch a request: #{inspect(reason)}")
Logger.error(Exception.format(:error, error, __STACKTRACE__))
Logger.debug("Could not fetch a request: #{inspect(reason)}")
Logger.debug(Exception.format(:error, error, __STACKTRACE__))
end
end
end
8 changes: 2 additions & 6 deletions test/manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,18 @@ defmodule ManagerTest do
test "Closespider itemcount is respected" do
Process.register(self(), :spider_closed_callback_test)
:ok = Crawly.Engine.start_spider(Manager.TestSpider)
Process.sleep(501)
Process.sleep(1_000)
assert_receive :itemcount_limit
end

test "Closespider timeout is respected" do
Process.register(self(), :close_by_timeout_listener)

:ok = Crawly.Engine.start_spider(Manager.CloseByTimeoutSpider)
Process.sleep(501)
Process.sleep(1_000)
assert_receive :itemcount_timeout
end

#

test "Can't start already started spider" do
:ok = Crawly.Engine.start_spider(Manager.TestSpider)

Expand Down Expand Up @@ -180,8 +178,6 @@ defmodule Manager.CloseByTimeoutSpider do

def override_settings() do
on_spider_closed_callback = fn reason ->
IO.puts("Stopped #{reason}")

case Process.whereis(:close_by_timeout_listener) do
nil ->
:nothing_to_do
Expand Down
5 changes: 4 additions & 1 deletion test/request_storage_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ defmodule RequestStorageTest do
Crawly.RequestsStorage.stats(:unkown)

assert {:error, :storage_worker_not_running} ==
Crawly.RequestsStorage.store(%{}, :unkown)
Crawly.RequestsStorage.store(
:unkown,
Crawly.Utils.request_from_url("http://example.com")
)
end

test "Duplicated requests are filtered out", context do
Expand Down

0 comments on commit 2bdc360

Please sign in to comment.