Skip to content

Commit

Permalink
Merge pull request #9 from oltarasenko/csv_encoder
Browse files Browse the repository at this point in the history
Add CSV encoder pipeline
  • Loading branch information
oltarasenko committed Aug 4, 2019
2 parents be21033 + b8bdcec commit 1965cb4
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 5 deletions.
27 changes: 27 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,33 @@ Defines a list of pipelines responsible for pre processing all scraped
items. All items not passing any of the pipelines are dropped. If
unset all items are stored without any modifications.

Example configuration of item pipelines:
```
config :crawly,
pipelines: [
Crawly.Pipelines.Validate,
Crawly.Pipelines.DuplicatesFilter,
Crawly.Pipelines.JSONEncoder
]
```

#### CSVEncoder pipeline

It's possible to export data in CSV format, if the pipelines are
defined in the following way:
```
config :crawly,
pipelines: [
Crawly.Pipelines.Validate,
Crawly.Pipelines.DuplicatesFilter,
Crawly.Pipelines.CSVEncoder
],
output_format: "csv"
```

** NOTE: It's required to set output format to csv for the CSVEncoder pipeline


### middlewares :: [module()]

default: [
Expand Down
30 changes: 26 additions & 4 deletions lib/crawly/data_storage/data_storage_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,35 @@ defmodule Crawly.DataStorage.Worker do
# Specify a path where items are stored on filesystem
base_path = Application.get_env(:crawly, :base_store_path, "/tmp/")

format = Application.get_env(:crawly, :output_format, "jl")

# Open file descriptor to write items
{:ok, fd} =
File.open("#{base_path}#{inspect(spider_name)}.jl", [
File.open("#{base_path}#{inspect(spider_name)}.#{format}", [
:binary,
:write,
:delayed_write,
:utf8
])

case format do
"csv" ->
# Special case. Need to insert headers.
item =
Enum.reduce(Application.get_env(:crawly, :item), "", fn
field, "" ->
"#{inspect(field)}"

field, acc ->
acc <> "," <> "#{inspect(field)}"
end)

write_item(fd, item)

_other ->
:ok
end

{:ok, %Worker{fd: fd}}
end

Expand All @@ -67,7 +87,7 @@ defmodule Crawly.DataStorage.Worker do
{:reply, {:stored_items, state.stored_items}, state}
end

def handle_info({:'EXIT', _from, _reason}, state) do
def handle_info({:EXIT, _from, _reason}, state) do
File.close(state.fd)
{:stop, :normal, state}
end
Expand All @@ -89,9 +109,11 @@ defmodule Crawly.DataStorage.Worker do
catch
error, reason ->
stacktrace = :erlang.get_stacktrace()

Logger.error(
"Could not write item: #{inspect(error)}, reason: #{
inspect(reason)}, stacktrace: #{inspect(stacktrace)}
"Could not write item: #{inspect(error)}, reason: #{inspect(reason)}, stacktrace: #{
inspect(stacktrace)
}
"
)
end
Expand Down
26 changes: 26 additions & 0 deletions lib/crawly/pipelines/csv_encoder.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Crawly.Pipelines.CSVEncoder do
@moduledoc """
Encodes a given item (map) into CSV
"""
@behaviour Crawly.Pipeline

@impl Crawly.Pipeline
def run(item, state) do
case Application.get_env(:crawly, :item) do
:undefined ->
{false, state}

fields ->
new_item =
Enum.reduce(fields, "", fn
field, "" ->
"#{inspect(Map.get(item, field, ""))}"

field, acc ->
acc <> "," <> "#{inspect(Map.get(item, field, ""))}"
end)

{new_item, state}
end
end
end
64 changes: 63 additions & 1 deletion test/data_storage_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ defmodule DataStorageWorkerTest do
{:ok, pid} = Crawly.DataStorage.start_worker(name)

on_exit(fn ->
:ok = DynamicSupervisor.terminate_child(Crawly.DataStorage.WorkersSup, pid)
:ok =
DynamicSupervisor.terminate_child(Crawly.DataStorage.WorkersSup, pid)
end)

{:ok, %{crawler: name}}
Expand Down Expand Up @@ -63,6 +64,14 @@ defmodule DataStorageWorkerTest do
end

test "Items are stored in JSON after json_encoder pipeline", context do
Application.put_env(:crawly, :pipelines, [
Crawly.Pipelines.Validate,
Crawly.Pipelines.DuplicatesFilter,
Crawly.Pipelines.JSONEncoder
])

Application.put_env(:crawly, :output_format, "jl")

item = %{
title: "test_title",
author: "me",
Expand Down Expand Up @@ -111,4 +120,57 @@ defmodule DataStorageWorkerTest do
{:stored_items, 2} = Crawly.DataStorage.stats(context.crawler)
:meck.unload(Application)
end

describe "CSV encoder test" do
setup do
Application.put_env(:crawly, :pipelines, [
Crawly.Pipelines.Validate,
Crawly.Pipelines.DuplicatesFilter,
Crawly.Pipelines.CSVEncoder
])

Application.put_env(:crawly, :output_format, "csv")

name = :test_crawler_csv
{:ok, pid} = Crawly.DataStorage.start_worker(name)

on_exit(fn ->
Application.put_env(:crawly, :pipelines, [
Crawly.Pipelines.Validate,
Crawly.Pipelines.DuplicatesFilter,
Crawly.Pipelines.JSONEncoder
])

Application.put_env(:crawly, :output_format, "jl")

:ok =
DynamicSupervisor.terminate_child(Crawly.DataStorage.WorkersSup, pid)
end)

{:ok, %{crawler: name}}
end

test "Items are stored in CSV after csv pipeline", context do
item = %{
title: "test_title",
author: "me",
time: "Now",
url: "http://example.com"
}

:ok = Crawly.DataStorage.store(context.crawler, item)

# TODO: Rewrite to avoid sleep
Process.sleep(3000)
base_path = Application.get_env(:crawly, :base_store_path, "/tmp/")

IO.puts("Data: #{base_path}#{inspect(context.crawler)}.csv")
{:ok, data} = File.read("#{base_path}#{inspect(context.crawler)}.csv")

[header, data, _] = String.split(data, "\n")
assert header == ":title,:author,:time,:url"
assert data == "\"test_title\",\"me\",\"Now\",\"http://example.com\""

end
end
end

0 comments on commit 1965cb4

Please sign in to comment.