Skip to content

Commit

Permalink
WIP: Crawly custom settings
Browse files Browse the repository at this point in the history
1. Extend Crawly.Spider behaviour with otional custom_settings
2. Add utility function to extract settings from a global config
   or spider's custom settings
  • Loading branch information
oltarasenko committed Apr 1, 2020
1 parent 0d13426 commit a6ecbf2
Show file tree
Hide file tree
Showing 19 changed files with 124 additions and 129 deletions.
22 changes: 12 additions & 10 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,30 @@ use Mix.Config

config :crawly,
manager_operations_timeout: 30_000,
# User agents which are going to be used with requests
user_agents: [
"Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.106 Safari/537.36 OPR/38.0.2220.41"
],

# Stop spider after scraping certain amount of items
closespider_itemcount: 100,
# Stop spider if it does crawl fast enough
closespider_timeout: 20,
concurrent_requests_per_domain: 5,
follow_redirect: true,
# Request middlewares

# Request middlewares
# User agents which are going to be used with requests
user_agents: [
"Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.106 Safari/537.36 OPR/38.0.2220.41"
],
middlewares: [
Crawly.Middlewares.DomainFilter,
Crawly.Middlewares.UniqueRequest,
Crawly.Middlewares.RobotsTxt,
Crawly.Middlewares.UserAgent
{Crawly.Middlewares.UserAgent, user_agents: ["My Custom Bot"]}
],
pipelines: [
Crawly.Pipelines.Validate,
Crawly.Pipelines.DuplicatesFilter,
{Crawly.Pipelines.Validate, fields: [:title, :url, :time, :author]},
{Crawly.Pipelines.DuplicatesFilter, item_id: :title},
Crawly.Pipelines.JSONEncoder
],
retry: [
Expand Down
2 changes: 1 addition & 1 deletion documentation/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ default: :disabled

An integer which specifies a number of items. If the spider scrapes more than that amount and those items are passed by the item pipeline, the spider will be closed. If set to :disabled the spider will not be stopped.

### closespider_timeout :: pos_integer()
### closespider_timeout :: pos_integer() | :disabled

default: nil

Expand Down
2 changes: 1 addition & 1 deletion lib/crawly/data_storage/data_storage_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule Crawly.DataStorage.Worker do
end

def handle_cast({:store, item}, state) do
pipelines = Application.get_env(:crawly, :pipelines, [])
pipelines = Crawly.Utils.get_settings(:pipelines, state.spider_name, [])

state =
case Crawly.Utils.pipe(pipelines, item, state) do
Expand Down
45 changes: 23 additions & 22 deletions lib/crawly/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ defmodule Crawly.Manager do

use GenServer

alias Crawly.Utils

def start_link(spider_name) do
Logger.debug("Starting the manager for #{spider_name}")
GenServer.start_link(__MODULE__, spider_name)
Expand All @@ -45,8 +47,7 @@ defmodule Crawly.Manager do
Process.link(data_storage_pid)

# Start RequestsWorker for a given spider
{:ok, request_storage_pid} =
Crawly.RequestsStorage.start_worker(spider_name)
{:ok, request_storage_pid} = Crawly.RequestsStorage.start_worker(spider_name)

Process.link(request_storage_pid)

Expand All @@ -56,8 +57,7 @@ defmodule Crawly.Manager do
:ok = Crawly.RequestsStorage.store(spider_name, requests)

# Start workers
num_workers =
Application.get_env(:crawly, :concurrent_requests_per_domain, 4)
num_workers = Utils.get_settings(:concurrent_requests_per_domain, spider_name, 4)

worker_pids =
Enum.map(1..num_workers, fn _x ->
Expand All @@ -67,12 +67,16 @@ defmodule Crawly.Manager do
)
end)

Logger.debug(
"Started #{Enum.count(worker_pids)} workers for #{spider_name}"
)
Logger.debug("Started #{Enum.count(worker_pids)} workers for #{spider_name}")

# Schedule basic service operations for given spider manager
tref = Process.send_after(self(), :operations, get_timeout())
tref =
Process.send_after(
self(),
:operations,
Utils.get_settings(:manager_operations_timeout, spider_name, @timeout)
)

{:ok, %{name: spider_name, tref: tref, prev_scraped_cnt: 0}}
end

Expand All @@ -85,43 +89,40 @@ defmodule Crawly.Manager do
delta = items_count - state.prev_scraped_cnt
Logger.info("Current crawl speed is: #{delta} items/min")

case Application.get_env(:crawly, :closespider_itemcount, :disabled) do
case Utils.get_settings(:closespider_itemcount, state.name, :disabled) do
:disabled ->
:ignored

cnt when cnt < items_count ->
Logger.info(
"Stopping #{inspect(state.name)}, closespider_itemcount achieved"
)
Logger.info("Stopping #{inspect(state.name)}, closespider_itemcount achieved")

Crawly.Engine.stop_spider(state.name)

_ ->
:ignoring
end

# Close spider in case if it's not scraping itms fast enough
case Application.get_env(:crawly, :closespider_timeout) do
# Close spider in case if it's not scraping items fast enough
case Utils.get_settings(:closespider_timeout, state.name, :disabled) do
:undefined ->
:ignoring

cnt when cnt > delta ->
Logger.info(
"Stopping #{inspect(state.name)}, itemcount timeout achieved"
)
Logger.info("Stopping #{inspect(state.name)}, itemcount timeout achieved")

Crawly.Engine.stop_spider(state.name)

_ ->
:ignoring
end

tref = Process.send_after(self(), :operations, get_timeout())
tref =
Process.send_after(
self(),
:operations,
Utils.get_settings(:manager_operations_timeout, state.name, @timeout)
)

{:noreply, %{state | tref: tref, prev_scraped_cnt: items_count}}
end

defp get_timeout() do
Application.get_env(:crawly, :manager_operations_timeout, @timeout)
end
end
5 changes: 2 additions & 3 deletions lib/crawly/middlewares/user_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Crawly.Middlewares.UserAgent do
### Example Declaration
```
middlewares: [
{UserAgent, user_agents: ["My Custom Bot] }
{UserAgent, user_agents: ["My Custom Bot"] }
]
```
"""
Expand All @@ -24,8 +24,7 @@ defmodule Crawly.Middlewares.UserAgent do
new_headers = List.keydelete(request.headers, "User-Agent", 0)

user_agents =
Map.get(opts, :user_agents) ||
Application.get_env(:crawly, :user_agents, ["Crawly Bot 1.0"])
Map.get(opts, :user_agents, ["Crawly Bot 1.0"])

useragent = Enum.random(user_agents)

Expand Down
2 changes: 1 addition & 1 deletion lib/crawly/pipelines/csv_encoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Crawly.Pipelines.CSVEncoder do
{false, state :: map} | {csv_line :: String.t(), state :: map}
def run(item, state, opts \\ []) do
opts = Enum.into(opts, %{fields: nil})
fields = Map.get(opts, :fields) || Application.get_env(:crawly, :item)
fields = Map.get(opts, :fields, [])

case fields do
:undefined ->
Expand Down
2 changes: 1 addition & 1 deletion lib/crawly/pipelines/duplicates_filter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ defmodule Crawly.Pipelines.DuplicatesFilter do
def run(item, state, opts \\ []) do
opts = Enum.into(opts, %{item_id: nil})

item_id = Map.get(opts, :item_id) || Application.get_env(:crawly, :item_id)
item_id = Map.get(opts, :item_id)

item_id = Map.get(item, item_id)

Expand Down
2 changes: 1 addition & 1 deletion lib/crawly/pipelines/validate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ defmodule Crawly.Pipelines.Validate do
@impl Crawly.Pipeline
def run(item, state, opts \\ []) do
opts = Enum.into(opts, %{fields: nil})
fields = Map.get(opts, :fields) || Application.get_env(:crawly, :item, [])
fields = Map.get(opts, :fields, [])

validation_result =
fields
Expand Down
1 change: 1 addition & 0 deletions lib/crawly/pipelines/write_to_file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ defmodule Crawly.Pipelines.WriteToFile do
def run(item, state, opts) do
opts = Enum.into(opts, %{folder: nil, extension: nil})

# Todo, maybe it's time to deprecrecate global config in favor of options based?
global_config =
Application.get_env(
:crawly,
Expand Down
7 changes: 7 additions & 0 deletions lib/crawly/spider.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ defmodule Crawly.Spider do
3. `parse_item/1` function which is responsible for parsing the downloaded
request and converting it into items which can be stored and new requests
which can be scheduled
4. `custom_settings/0` an optional callback which can be used in order to
provide custom spider specific settings. Should define a map with custom
settings and their values. These values will take precidence over the
global settings define in the config.
"""

@callback init() :: [start_urls: list()]
Expand All @@ -18,4 +22,7 @@ defmodule Crawly.Spider do
@callback parse_item(response :: HTTPoison.Response.t()) ::
Crawly.ParsedItem.t()

@callback custom_settings() :: Map.t()

@optional_callbacks custom_settings: 0
end
48 changes: 45 additions & 3 deletions lib/crawly/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ defmodule Crawly.Utils do
catch
error, reason ->
Logger.error(
"Pipeline crash: #{module}, error: #{inspect(error)}, reason: #{
inspect(reason)
}, args: #{inspect(args)}"
"Pipeline crash: #{module}, error: #{inspect(error)}, reason: #{inspect(reason)}, args: #{
inspect(args)
}"
)

{item, state}
Expand All @@ -113,4 +113,46 @@ defmodule Crawly.Utils do
def send_after(pid, message, timeout) do
Process.send_after(pid, message, timeout)
end

# @doc """
#
# """
@spec get_settings(setting_name, spider_name, default) :: result
when setting_name: atom(),
spider_name: atom(),
default: term(),
result: term()

def get_settings(setting_name, spider_name \\ nil, default \\ nil) do
global_setting = Application.get_env(:crawly, setting_name, default)

case get_spider_setting(setting_name, spider_name) do
nil ->
# No custom settings for a spider found
global_setting

custom_setting ->
custom_setting
end
end

##############################################################################
# Private functions
##############################################################################
@spec get_spider_setting(spider_name, setting_name) :: result
when spider_name: atom(),
setting_name: atom(),
result: nil | term()

defp get_spider_setting(_setting_name, nil), do: nil

defp get_spider_setting(setting_name, spider_name) do
case function_exported?(spider_name, :custom_settings, 0) do
true ->
Map.get(spider_name.custom_settings(), setting_name, nil)

false ->
nil
end
end
end
44 changes: 22 additions & 22 deletions lib/crawly/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,12 @@ defmodule Crawly.Worker do

case :epipe.run(functions, {request, spider_name}) do
{:error, _step, reason, _step_state} ->

Logger.debug(
fn ->
"Crawly worker could not process the request to #{
inspect(request.url)
}
Logger.debug(fn ->
"Crawly worker could not process the request to #{
inspect(request.url)
}
reason: #{inspect(reason)}"
end
)
end)

@default_backoff

Expand All @@ -73,14 +70,16 @@ defmodule Crawly.Worker do
# check if spider-level fetcher is set. Overrides the globally configured fetcher.
# if not set, log warning for explicit config preferred,
# get the globally-configured fetcher. Defaults to HTTPoisonFetcher
{fetcher, options} = Application.get_env(
:crawly,
:fetcher,
{Crawly.Fetchers.HTTPoisonFetcher, []}
)

retry_options = Application.get_env(:crawly, :retry, [])
{fetcher, options} =
Crawly.Utils.get_settings(
:fetcher,
spider_name,
{Crawly.Fetchers.HTTPoisonFetcher, []}
)

retry_options = Crawly.Utils.get_settings(:retry, spider_name, [])
retry_codes = Keyword.get(retry_options, :retry_codes, [])

case fetcher.fetch(request, options) do
{:error, _reason} = err ->
:ok = maybe_retry_request(spider_name, request)
Expand All @@ -93,11 +92,11 @@ defmodule Crawly.Worker do
true ->
:ok = maybe_retry_request(spider_name, request)
{:error, :retry}

false ->
{:ok, {response, spider_name}}
end
end

end

@spec parse_item({response, spider_name}) :: result
Expand Down Expand Up @@ -141,7 +140,8 @@ defmodule Crawly.Worker do
fn request ->
request = Map.put(request, :prev_response, response)
Crawly.RequestsStorage.store(spider_name, request)
end)
end
)

# Process all items one by one
Enum.each(
Expand All @@ -157,7 +157,7 @@ defmodule Crawly.Worker do
## Retry a request if max retries allows to do so
defp maybe_retry_request(spider, request) do
retries = request.retries
retry_settings = Application.get_env(:crawly, :retry, Keyword.new())
retry_settings = Crawly.Utils.get_settings(:retry, spider, Keyword.new())

ignored_middlewares = Keyword.get(retry_settings, :ignored_middlewares, [])
max_retries = Keyword.get(retry_settings, :max_retries, 0)
Expand All @@ -169,16 +169,16 @@ defmodule Crawly.Worker do
middlewares = request.middlewares -- ignored_middlewares

request = %Crawly.Request{
request |
middlewares: middlewares,
retries: retries + 1
request
| middlewares: middlewares,
retries: retries + 1
}

:ok = Crawly.RequestsStorage.store(spider, request)

false ->
Logger.error("Dropping request to #{request.url}, (max retries)")
:ok
end

end
end

0 comments on commit a6ecbf2

Please sign in to comment.