diff --git a/docs/README.md b/docs/README.md index 2b0e9687..c63971c9 100644 --- a/docs/README.md +++ b/docs/README.md @@ -778,6 +778,33 @@ Defines a list of pipelines responsible for pre processing all scraped items. All items not passing any of the pipelines are dropped. If unset all items are stored without any modifications. +Example configuration of item pipelines: +``` +config :crawly, + pipelines: [ + Crawly.Pipelines.Validate, + Crawly.Pipelines.DuplicatesFilter, + Crawly.Pipelines.JSONEncoder + ] + ``` + +#### CSVEncoder pipeline + +It's possible to export data in CSV format, if the pipelines are +defined in the following way: +``` +config :crawly, + pipelines: [ + Crawly.Pipelines.Validate, + Crawly.Pipelines.DuplicatesFilter, + Crawly.Pipelines.CSVEncoder + ], + output_format: "csv" + ``` + +** NOTE: It's required to set output format to csv for the CSVEncoder pipeline + + ### middlewares :: [module()] default: [ diff --git a/lib/crawly/data_storage/data_storage_worker.ex b/lib/crawly/data_storage/data_storage_worker.ex index 43ac5f7a..c1c197dd 100644 --- a/lib/crawly/data_storage/data_storage_worker.ex +++ b/lib/crawly/data_storage/data_storage_worker.ex @@ -35,15 +35,35 @@ defmodule Crawly.DataStorage.Worker do # Specify a path where items are stored on filesystem base_path = Application.get_env(:crawly, :base_store_path, "/tmp/") + format = Application.get_env(:crawly, :output_format, "jl") + # Open file descriptor to write items {:ok, fd} = - File.open("#{base_path}#{inspect(spider_name)}.jl", [ + File.open("#{base_path}#{inspect(spider_name)}.#{format}", [ :binary, :write, :delayed_write, :utf8 ]) + case format do + "csv" -> + # Special case. Need to insert headers. + item = + Enum.reduce(Application.get_env(:crawly, :item), "", fn + field, "" -> + "#{inspect(field)}" + + field, acc -> + acc <> "," <> "#{inspect(field)}" + end) + + write_item(fd, item) + + _other -> + :ok + end + {:ok, %Worker{fd: fd}} end @@ -67,7 +87,7 @@ defmodule Crawly.DataStorage.Worker do {:reply, {:stored_items, state.stored_items}, state} end - def handle_info({:'EXIT', _from, _reason}, state) do + def handle_info({:EXIT, _from, _reason}, state) do File.close(state.fd) {:stop, :normal, state} end @@ -89,9 +109,11 @@ defmodule Crawly.DataStorage.Worker do catch error, reason -> stacktrace = :erlang.get_stacktrace() + Logger.error( - "Could not write item: #{inspect(error)}, reason: #{ - inspect(reason)}, stacktrace: #{inspect(stacktrace)} + "Could not write item: #{inspect(error)}, reason: #{inspect(reason)}, stacktrace: #{ + inspect(stacktrace) + } " ) end diff --git a/lib/crawly/pipelines/csv_encoder.ex b/lib/crawly/pipelines/csv_encoder.ex new file mode 100644 index 00000000..985ae792 --- /dev/null +++ b/lib/crawly/pipelines/csv_encoder.ex @@ -0,0 +1,26 @@ +defmodule Crawly.Pipelines.CSVEncoder do + @moduledoc """ + Encodes a given item (map) into CSV + """ + @behaviour Crawly.Pipeline + + @impl Crawly.Pipeline + def run(item, state) do + case Application.get_env(:crawly, :item) do + :undefined -> + {false, state} + + fields -> + new_item = + Enum.reduce(fields, "", fn + field, "" -> + "#{inspect(Map.get(item, field, ""))}" + + field, acc -> + acc <> "," <> "#{inspect(Map.get(item, field, ""))}" + end) + + {new_item, state} + end + end +end diff --git a/test/data_storage_worker_test.exs b/test/data_storage_worker_test.exs index 2114760b..a53ab7aa 100644 --- a/test/data_storage_worker_test.exs +++ b/test/data_storage_worker_test.exs @@ -6,7 +6,8 @@ defmodule DataStorageWorkerTest do {:ok, pid} = Crawly.DataStorage.start_worker(name) on_exit(fn -> - :ok = DynamicSupervisor.terminate_child(Crawly.DataStorage.WorkersSup, pid) + :ok = + DynamicSupervisor.terminate_child(Crawly.DataStorage.WorkersSup, pid) end) {:ok, %{crawler: name}} @@ -63,6 +64,14 @@ defmodule DataStorageWorkerTest do end test "Items are stored in JSON after json_encoder pipeline", context do + Application.put_env(:crawly, :pipelines, [ + Crawly.Pipelines.Validate, + Crawly.Pipelines.DuplicatesFilter, + Crawly.Pipelines.JSONEncoder + ]) + + Application.put_env(:crawly, :output_format, "jl") + item = %{ title: "test_title", author: "me", @@ -111,4 +120,57 @@ defmodule DataStorageWorkerTest do {:stored_items, 2} = Crawly.DataStorage.stats(context.crawler) :meck.unload(Application) end + + describe "CSV encoder test" do + setup do + Application.put_env(:crawly, :pipelines, [ + Crawly.Pipelines.Validate, + Crawly.Pipelines.DuplicatesFilter, + Crawly.Pipelines.CSVEncoder + ]) + + Application.put_env(:crawly, :output_format, "csv") + + name = :test_crawler_csv + {:ok, pid} = Crawly.DataStorage.start_worker(name) + + on_exit(fn -> + Application.put_env(:crawly, :pipelines, [ + Crawly.Pipelines.Validate, + Crawly.Pipelines.DuplicatesFilter, + Crawly.Pipelines.JSONEncoder + ]) + + Application.put_env(:crawly, :output_format, "jl") + + :ok = + DynamicSupervisor.terminate_child(Crawly.DataStorage.WorkersSup, pid) + end) + + {:ok, %{crawler: name}} + end + + test "Items are stored in CSV after csv pipeline", context do + item = %{ + title: "test_title", + author: "me", + time: "Now", + url: "http://example.com" + } + + :ok = Crawly.DataStorage.store(context.crawler, item) + + # TODO: Rewrite to avoid sleep + Process.sleep(3000) + base_path = Application.get_env(:crawly, :base_store_path, "/tmp/") + + IO.puts("Data: #{base_path}#{inspect(context.crawler)}.csv") + {:ok, data} = File.read("#{base_path}#{inspect(context.crawler)}.csv") + + [header, data, _] = String.split(data, "\n") + assert header == ":title,:author,:time,:url" + assert data == "\"test_title\",\"me\",\"Now\",\"http://example.com\"" + + end + end end