Skip to content

Commit

Permalink
Merge 6ce3192 into 838049e
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziinc committed Dec 20, 2019
2 parents 838049e + 6ce3192 commit 979c9ce
Show file tree
Hide file tree
Showing 17 changed files with 477 additions and 91 deletions.
93 changes: 78 additions & 15 deletions documentation/basic_concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,28 +77,39 @@ The parsed item is being processed by Crawly.Worker process, which sends all req

For now only one Storage backend is supported (writing on disc). But in future Crawly will also support work with amazon S3, sql and others.

## The `Crawly.Pipeline` Behaviour.

Crawly is using a concept of pipelines when it comes to processing of the elements sent to the system. This is applied to both request and scraped item manipulation. Conceptually, requests go through a series of manipulations, before the response is fetched. The response then goes through another different series of manipulations.

Importantly, the way that requests and responses are manipulated are abstracted into the `Crawly.Pipeline` behaviour. This allows for a modular system for declaring changes. It is also to be noted that Each `Crawly.Pipeline` module, when declared, are applied sequentially through the `Crawly.Utils.pipe/3` function.

### Writing Tests for Custom Pipelines

Modules that implement the `Crawly.Pipeline` behaviour can make use of the `Crawly.Utils.pipe/3` function to test for expected behaviour. Refer to the function documentation for more information and examples.

## Request Middlewares

These are configured under the `middlewares` option. See [configuration](./configuration.md) for more details.

> **Middleware:** A pipeline module that modifies a request. It implements the `Crawly.Pipeline` behaviour.
List of built-in middlewares:
Middlewares are able to make changes to the underlying request, a `Crawly.Request` struct. The request, along with any options specified, is then passed to the fetcher (currently `HTTPoison`).
The available configuration options should correspond to the underlying options of the fetcher in use.

Note that all request configuration options for `HTTPoison`, such as proxy, ssl, etc can be configured through `Crawly.Request.options`.

Built-in middlewares:

1. `Crawly.Middlewares.DomainFilter` - this middleware will disable scheduling for all requests leading outside of the crawled site.
2. `Crawly.Middlewares.RobotsTxt` - this middleware ensures that Crawly respects the robots.txt defined by the target website.
3. `Crawly.Middlewares.UniqueRequest` - this middleware ensures that crawly would not schedule the same URL(request) multiple times.
4. `Crawly.Middlewares.UserAgent` - this middleware is used to set a User Agent HTTP header. Allows to rotate UserAgents, if the last one is defined as a list.

### Creating a Custom Request Middleware

TODO

## Item Pipelines
### Item Pipelines

Crawly is using a concept of pipelines when it comes to processing of the elements sent to the system. In this section we will cover the topic of item pipelines - a tool which is used in order to pre-process items before storing them in the storage.
> **Item Pipelines:** a pipeline module that modifies and pre-processes a scraped item.
At this point Crawly includes the following Item pipelines:
Built-in item pipelines:

1. `Crawly.Pipelines.Validate` - validates that a given item has all the required fields. All items which don't have all required fields are dropped.
2. `Crawly.Pipelines.DuplicatesFilter` - filters out items which are already stored the system.
Expand All @@ -108,22 +119,48 @@ At this point Crawly includes the following Item pipelines:

The list of item pipelines used with a given project is defined in the project settings.

### Creating a Custom Item Pipeline
## Creating a Custom Pipeline Module

Both item pipelines and request middlewares follows the `Crawly.Pipeline` behaviour. As such, when creating your custom pipeline, it will need to implement the required callback `c:Crawly.Pipeline.run\3`.

The `c:Crawly.Pipeline.run\3` callback receives the processed item, `item` from the previous pipeline module as the first argument. The second argument, `state`, is a map containing information such as spider which the item originated from (under the `:spider_name` key), and may optionally store pipeline information. Finally, `opts` is a keyword list containing any tuple-based options.

### Passing Configuration Options To Your Pipeline

Tuple-based option declaration is supported, similar to how a `GenServer` is declared in a supervision tree. This allows for pipeline reusability for different use cases.

For example, you can pass options in this way through your pipeline declaration:

```elixir
pipelines: [
{MyCustomPipeline, my_option: "value"}
]
```

In your pipeline, you will then receive the options passed through the `opts` argument.

An item pipeline follows the `Crawly.Pipeline` behaviour. As such, when creating your custom pipeline, it will need to implement the required callback `c:Crawly.Pipeline.run\2`.
```elixir
defmodule MyCustomPipeline do
@impl Crawly.Pipeline
def run(item, state, opts) do
IO.inspect(opts) # shows keyword list of [ my_option: "value" ]
# Do something
end
end
```

> **Note**: [PR #31](https://github.com/oltarasenko/crawly/pull/31) aims to allow tuple-based option declaration, similar to how a `GenServer` is declared ina supervision tree.
### Best Practices

The `c:Crawly.Pipeline.run\2` callback receives the processed item, `item` from the previous pipeline module as the first argument. The second argument, `state`, is a map containing information such as spider which the item originated from (under the `:spider_name` key), and may optionally store pipeline information.
The use of global configs is discouraged, hence one pass options through a tuple-based pipeline declaration where possible.

When storing information in the `state` map, ensure that the state is namespaced with the pipeline name, so as to avoid key clashing. For example, to store state from `MyEctoPipeline`, store the state on the key `:my_ecto_pipeline_my_state`.

#### Example - Ecto Storage Pipeline
### Item Pipeline Example - Ecto Storage Pipeline

```elxiir
```elixir
defmodule MyApp.MyEctoPipeline do
@impl Crawly.Pipeline
def run(item, state) do
def run(item, state, _opts \\ []) do
case MyApp.insert_with_ecto(item) do
{:ok, _} ->
# insert successful, carry on with pipeline
Expand All @@ -135,3 +172,29 @@ defmodule MyApp.MyEctoPipeline do
end
end
```

### Request Middleware Example - Add a Proxy

Following the [documentation](https://hexdocs.pm/httpoison/HTTPoison.Request.html) for proxy options of a request in `HTTPoison`, we can do the following:

```elixir
defmodule MyApp.MyProxyMiddleware do
@impl Crawly.Pipeline
def run(request, state, opts \\ []) do
# Set default proxy and proxy_auth to nil
opts = Enum.into(opts, %{proxy: nil, proxy_auth: nil})

case opts.proxy do
nil ->
# do nothing
{request, state}
value ->
old_options = request.options
new_options = [proxy: opts.proxy, proxy_auth: opts.proxy_auth]
new_request = Map.put(request, :options, old_optoins ++ new_options)
{new_request, state}
end

end
end
```
36 changes: 23 additions & 13 deletions documentation/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ A basic example:

```elixir
config :crawly,
# Item definition
item: [:title, :author, :time, :url],
# Identifier which is used to filter out duplicates
item_id: :title
pipelines: [
# my pipelines
]
middlewares: [
# my middlewares
]
```

## Options
Expand Down Expand Up @@ -41,6 +43,8 @@ fields are added to the following item (or if the values of
required fields are "" or nil), the item will be dropped. This setting
is used by the `Crawly.Pipelines.Validate` pipeline

> **Deprecated**: This has been deprecated in favour of tuple-based pipeline configuration instead of global configurations, as of `0.7.0`
### `item_id` :: atom()

default: nil
Expand All @@ -51,6 +55,8 @@ field is the SKU. This setting is used in
the `Crawly.Pipelines.DuplicatesFilter` pipeline. If unset, the related
middleware is effectively disabled.

> **Deprecated**: This has been deprecated in favour of tuple-based pipeline configuration instead of global configurations, as of `0.7.0`
### `pipelines` :: [module()]

default: []
Expand All @@ -62,21 +68,25 @@ Example configuration of item pipelines:
```
config :crawly,
pipelines: [
Crawly.Pipelines.Validate,
Crawly.Pipelines.DuplicatesFilter,
{Crawly.Pipelines.Validate, fields: [:id, :date]},
{Crawly.Pipelines.DuplicatesFilter, item_id: :id},
Crawly.Pipelines.JSONEncoder,
Crawly.Pipelines.WriteToFile # NEW IN 0.6.0
{Crawly.Pipelines.WriteToFile, extension: "jl", folder: "/tmp"} # NEW IN 0.6.0
]
```

### middlewares :: [module()]

default: [
Crawly.Middlewares.DomainFilter,
Crawly.Middlewares.UniqueRequest,
Crawly.Middlewares.RobotsTxt,
Crawly.Middlewares.UserAgent
]
```elixir
The default middlewares are as follows:
config :crawly,
middlewares: [
Crawly.Middlewares.DomainFilter,
Crawly.Middlewares.UniqueRequest,
Crawly.Middlewares.RobotsTxt,
Crawly.Middlewares.UserAgent
]
```

Defines a list of middlewares responsible for pre-processing requests. If any of the requests from the `Crawly.Spider` is not passing the middleware, it's dropped.

Expand Down
11 changes: 4 additions & 7 deletions documentation/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,16 @@ Goals:
concurrent_requests_per_domain: 8,
follow_redirects: true,
closespider_itemcount: 1000,
output_format: "csv",
item: [:title, :url],
item_id: :title,
middlewares: [
Crawly.Middlewares.DomainFilter,
Crawly.Middlewares.UniqueRequest,
Crawly.Middlewares.UserAgent
],
pipelines: [
Crawly.Pipelines.Validate,
Crawly.Pipelines.DuplicatesFilter,
Crawly.Pipelines.CSVEncoder,
Crawly.Pipelines.WriteToFile
{Crawly.Pipelines.Validate, fields: [:title, :url]},
{Crawly.Pipelines.DuplicatesFilter, item_id: :title },
{Crawly.Pipelines.CSVEncoder, fields: [:title, :url}],
{Crawly.Pipelines.WriteToFile, extension: "csv", folder: "/tmp" }
]
```
5. Start the Crawl:
Expand Down
15 changes: 13 additions & 2 deletions lib/crawly/middlewares/domain_filter.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
defmodule Crawly.Middlewares.DomainFilter do
@moduledoc """
Filters out requests which are going outside of the crawled domain
Filters out requests which are going outside of the crawled domain.
The domain that is used to compare against the request url is obtained from the spider's `c:Crawly.Spider.base_url` callback.
Does not accept any options. Tuple-based configuration optionswill be ignored.
### Example Declaration
```
middlewares: [
Crawly.Middlewares.DomainFilter
]
```
"""

@behaviour Crawly.Pipeline
require Logger

def run(request, state) do
def run(request, state, _opts \\ []) do
base_url = state.spider_name.base_url()

case String.contains?(request.url, base_url) do
Expand Down
5 changes: 5 additions & 0 deletions lib/crawly/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,9 @@ defmodule Crawly.Pipeline do
@callback run(item :: map, state :: map()) ::
{new_item :: map, new_state :: map}
| {false, new_state :: map}

@callback run(item :: map, state :: map(), args :: list(any())) ::
{new_item :: map, new_state :: map}
| {false, new_state :: map}
@optional_callbacks run: 3
end
26 changes: 23 additions & 3 deletions lib/crawly/pipelines/csv_encoder.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,33 @@
defmodule Crawly.Pipelines.CSVEncoder do
@moduledoc """
Encodes a given item (map) into CSV
Encodes a given item (map) into CSV. Does not flatten nested maps.
### Options
If no fields are given, the item is dropped from the pipeline.
- `:fields`, required: The fields to extract out from the scraped item. Falls back to the global config `:item`.
### Example Usage
iex> item = %{my: "first", other: "second", ignore: "this_field"}
iex> Crawly.Pipelines.CSVEncoder.run(item, %{}, fields: [:my, :other])
{"first,second", %{}}
"""
@behaviour Crawly.Pipeline
require Logger

@impl Crawly.Pipeline
def run(item, state) do
case Application.get_env(:crawly, :item) do
@spec run(map, map, fields: list(atom)) ::
{false, state :: map} | {csv_line :: String.t(), state :: map}
def run(item, state, opts \\ []) do
opts = Enum.into(opts, %{fields: nil})
fields = Map.get(opts, :fields) || Application.get_env(:crawly, :item)

case fields do
:undefined ->
# only for when both tuple and global config is not provided

Logger.info(
"Dropping item: #{inspect(item)}. Reason: No fields declared for CSVEncoder"
)

{false, state}

fields ->
Expand Down
39 changes: 30 additions & 9 deletions lib/crawly/pipelines/duplicates_filter.ex
Original file line number Diff line number Diff line change
@@ -1,29 +1,50 @@
defmodule Crawly.Pipelines.DuplicatesFilter do
@moduledoc """
Filters out duplicated items (helps to avoid storing duplicates)
Filters out duplicated items based on the provided `item_id`.
This pipeline uses Crawly.DataStorageWorker process state in order to store
ids of already seen items. For now they are stored only in memory.
Stores identifier values in state under the `:duplicates_filter` key.
The field responsible for identifying duplicates is specified using
:crawly.item_id setting.
### Options
If item unique identifier is not provided, this pipeline does nothing.
- `:item_id`, required: Designates a field to be used to check for duplicates. Falls back to global config `:item_id`.
### Example Usage
```
iex> item = %{my: "item"}
iex> {_unchanged, new_state} = DuplicatesFilter.run(first, %{}, item_id: :my)
# Rerunning the item through the pipeline will drop the item
iex> DuplicatesFilter.run(first, %{}, item_id: :id)
{false, %{
duplicates_filter: %{"item" => true}
}}
```
"""
@behaviour Crawly.Pipeline

require Logger

@impl Crawly.Pipeline
def run(item, state) do
item_id = Application.get_env(:crawly, :item_id)
@spec run(map, map, item_id: atom) ::
{false, state :: map}
| {item :: map,
state :: %{duplicates_filter: %{required(String.t()) => boolean}}}
def run(item, state, opts \\ []) do
opts = Enum.into(opts, %{item_id: nil})

item_id = Map.get(opts, :item_id) || Application.get_env(:crawly, :item_id)

item_id = Map.get(item, item_id)

case item_id do
nil ->
Logger.info(
"Duplicates filter pipeline is inactive, item_id field is required
to make it operational"
"Duplicates filter pipeline is inactive, item_id option is required
to make it operational."
)

{item, state}

_ ->
do_run(item_id, item, state)
end
Expand Down
17 changes: 16 additions & 1 deletion lib/crawly/pipelines/json_encoder.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
defmodule Crawly.Pipelines.JSONEncoder do
@moduledoc """
Encodes a given item (map) into JSON
No options are available for this pipeline.
### Example Declaration
```
pipelines: [
Crawly.Pipelines.JSONEncoder
]
```
### Example Usage
```
iex> JSONEncoder.run(%{my: "field"}, %{})
{"{\"my\":\"field\"}", %{}}
```
"""
@behaviour Crawly.Pipeline

require Logger

@impl Crawly.Pipeline
def run(item, state) do
def run(item, state, _opts \\ []) do
case Poison.encode(item) do
{:ok, new_item} ->
{new_item, state}
Expand Down

0 comments on commit 979c9ce

Please sign in to comment.