Skip to content

Commit

Permalink
Merge 37832ec into 838049e
Browse files Browse the repository at this point in the history
  • Loading branch information
oltarasenko committed Dec 20, 2019
2 parents 838049e + 37832ec commit 20cb4c0
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 69 deletions.
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use Mix.Config
config :crawly, Crawly.Worker, client: HTTPoison

config :crawly,
max_retries: 3,
# User agents which are going to be used with requests
user_agents: [
"Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0",
Expand Down
1 change: 1 addition & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ config :crawly,
item: [:title, :author, :time, :url],
# Identifier which is used to filter out duplicates
item_id: :title,
max_retries: 2,
# Stop spider after scraping certain amount of items
closespider_itemcount: 100,
# Stop spider if it does crawl fast enough
Expand Down
10 changes: 9 additions & 1 deletion documentation/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,19 @@ default: 4

The maximum number of concurrent (ie. simultaneous) requests that will be performed by the Crawly workers.


### max_retries :: pos_integer()

default: 3

Controlls the amount of retries made by the Crawly in order to fetch a successful
request (the one with HTTP 200 response).

### proxy :: binary()

Requests can be directed through a proxy. It will set the proxy option for the request.
It's possible to set proxy using the proxy value of Crawly config, for example:

```
config :crawly,
proxy: "<proxy_host>:<proxy_port>",
Expand Down
9 changes: 8 additions & 1 deletion lib/crawly/middlewares/unique_request.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
defmodule Crawly.Middlewares.UniqueRequest do
@moduledoc """
Avoid scheduling requests for the same pages.
Avoid scheduling requests for the same pages. However if retry is requested
the request is still allowed.
"""
require Logger

# Allow to re-schedule a request if retries are required
def run(%Crawly.Request{retries: retries} = request, state) when retries > 0 do
{request, state}
end

def run(request, state) do
unique_request_seen_requests =
Map.get(state, :unique_request_seen_requests, %{})
Expand All @@ -30,4 +36,5 @@ defmodule Crawly.Middlewares.UniqueRequest do
{false, state}
end
end

end
5 changes: 4 additions & 1 deletion lib/crawly/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ defmodule Crawly.Request do
Defines Crawly request structure.
"""
defstruct url: nil, headers: [], prev_response: nil, options: []
defstruct url: nil, headers: [], prev_response: nil, options: [], retries: 0

@type header() :: {key(), value()}
@typep key :: binary()
@typep value :: binary()

@type retries :: non_neg_integer()

@type option :: {atom(), binary()}

@type t :: %__MODULE__{
url: binary(),
headers: [header()],
prev_response: %{},
retries: 0,
options: [option()]
}
end
4 changes: 3 additions & 1 deletion lib/crawly/requests_storage/requests_storage_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ defmodule Crawly.RequestsStorage.Worker do
@spec store(spider_name, request) :: :ok
when spider_name: atom(),
request: Crawly.Request.t()
def store(pid, request), do: GenServer.call(pid, {:store, request})
def store(pid, request) do
GenServer.call(pid, {:store, request})
end

@doc """
Pop a request out of requests storage
Expand Down
12 changes: 12 additions & 0 deletions lib/crawly/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,16 @@ defmodule Crawly.Utils do

pipe(pipelines, new_item, new_state)
end

@doc """
A wrapper over Process.send after
This wrapper should be used instead of Process.send_after, so it's possible
to mock the last one. To avoid race conditions on worker's testing.
"""
@spec send_after(pid(), term(), pos_integer()) :: reference()
def send_after(pid, message, timeout) do
Process.send_after(pid, message, timeout)
end

end
87 changes: 51 additions & 36 deletions lib/crawly/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,44 @@ defmodule Crawly.Worker do
end

def init([spider_name]) do
Process.send_after(self(), :work, @default_backoff)
Crawly.Utils.send_after(self(), :work, @default_backoff)

{:ok, %Crawly.Worker{spider_name: spider_name, backoff: @default_backoff}}
end

def handle_info(:work, state) do
%{spider_name: spider_name, backoff: backoff} = state
new_backoff = do_work(state.spider_name, state.backoff)
Crawly.Utils.send_after(self(), :work, new_backoff)
{:noreply, %{state | backoff: new_backoff}}
end

defp do_work(spider_name, backoff) do
# Get a request from requests storage.
new_backoff =
case Crawly.RequestsStorage.pop(spider_name) do
nil ->
# Slow down a bit when there are no new URLs
backoff * 2

request ->
# Process the request using following group of functions
functions = [
{:get_response, &get_response/1},
{:parse_item, &parse_item/1},
{:process_parsed_item, &process_parsed_item/1}
]

case :epipe.run(functions, {request, spider_name}) do
{:error, _step, reason, _step_state} ->
# TODO: Add retry logic
Logger.error(
fn ->
"Crawly worker could not process the request to #{inspect(request.url)}
case Crawly.RequestsStorage.pop(spider_name) do
nil ->
# Slow down a bit when there are no new URLs
backoff * 2

request ->
# Process the request using following group of functions
functions = [
{:get_response, &get_response/1},
{:parse_item, &parse_item/1},
{:process_parsed_item, &process_parsed_item/1}
]

case :epipe.run(functions, {request, spider_name}) do
{:error, _step, reason, _step_state} ->
Logger.error(
fn ->
"Crawly worker could not process the request to #{inspect(request.url)}
reason: #{inspect(reason)}"
end)
@default_backoff
{:ok, _result} ->
@default_backoff
end

end

Process.send_after(self(), :work, new_backoff)

{:noreply, %{state | backoff: new_backoff}}
end)
@default_backoff
{:ok, _result} ->
@default_backoff
end
end
end

@spec get_response({request, spider_name}) :: result
Expand All @@ -67,11 +64,12 @@ defmodule Crawly.Worker do
result: {:ok, response, spider_name} | {:error, term()}
defp get_response({request, spider_name}) do
case HTTPoison.get(request.url, request.headers, request.options) do
{:ok, response} ->
{:ok, %HTTPoison.Response{status_code: 200} = response} ->
{:ok, {response, spider_name}}

{:error, _reason} = response ->
response
non_successful_response ->
:ok = maybe_retry_request(spider_name, request)
{:error, non_successful_response}
end
end

Expand Down Expand Up @@ -145,4 +143,21 @@ defmodule Crawly.Worker do

{:ok, :done}
end

## Retry a request if max retries allows to do so
defp maybe_retry_request(spider, %Crawly.Request{retries: retries} = request) do
max_retires = Application.get_env(:crawly, :max_retries, 3)

case retries <= max_retires do
true ->
Logger.info("Request to #{request.url}, is scheduled for retry")
:ok = Crawly.RequestsStorage.store(
spider,
%Crawly.Request{request | retries: retries + 1}
)
false ->
Logger.info("Dropping request to #{request.url}, (max retries)")
:ok
end
end
end

0 comments on commit 20cb4c0

Please sign in to comment.