Skip to content

Commit

Permalink
fix some test error & compile time warning (#177)
Browse files Browse the repository at this point in the history
* fix: test error

* style: mix format
  • Loading branch information
feng19 committed Mar 23, 2021
1 parent 60c5e1a commit 0a28302
Show file tree
Hide file tree
Showing 19 changed files with 200 additions and 179 deletions.
1 change: 1 addition & 0 deletions lib/crawly.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ defmodule Crawly do
@type parsed_item_result :: Crawly.ParsedItem.t()
@type parsed_items :: list(any())
@type pipeline_state :: %{optional(atom()) => any()}
@type spider :: module()

@spec fetch(url, opts) ::
HTTPoison.Response.t()
Expand Down
4 changes: 2 additions & 2 deletions lib/crawly/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ defmodule Crawly.Application do
# List all child processes to be supervised

children = [
worker(Crawly.Engine, []),
supervisor(Crawly.EngineSup, []),
{Crawly.Engine, []},
{DynamicSupervisor, strategy: :one_for_one, name: Crawly.EngineSup},
{Crawly.DataStorage, []},
{Crawly.RequestsStorage, []},
{DynamicSupervisor,
Expand Down
60 changes: 34 additions & 26 deletions lib/crawly/engine.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ defmodule Crawly.Engine do

@type t :: %__MODULE__{
started_spiders: started_spiders(),
known_spiders: [module()]
known_spiders: [Crawly.spider()]
}
@type started_spiders() :: %{optional(module()) => identifier()}

@type started_spiders() :: %{optional(Crawly.spider()) => identifier()}

@type spider_info() :: %{
name: module(),
name: Crawly.spider(),
status: :stopped | :started,
pid: identifier() | nil
}
Expand All @@ -38,9 +39,8 @@ defmodule Crawly.Engine do
If the 2nd positional argument is a binary, it will be set as the `:crawl_id`. Deprecated, will be removed in the future.
"""
@type crawl_id_opt :: {:crawl_id, binary()}
@spec start_spider(spider_name, opts) :: result
when spider_name: module(),
opts: [crawl_id_opt],
@spec start_spider(Crawly.spider(), opts) :: result
when opts: [crawl_id_opt],
result:
:ok
| {:error, :spider_already_started}
Expand Down Expand Up @@ -81,8 +81,7 @@ defmodule Crawly.Engine do
)
end

@spec get_manager(module()) ::
pid() | {:error, :spider_not_found}
@spec get_manager(Crawly.spider()) :: pid() | {:error, :spider_not_found}
def get_manager(spider_name) do
case Map.fetch(running_spiders(), spider_name) do
:error ->
Expand All @@ -101,7 +100,7 @@ defmodule Crawly.Engine do
end
end

@spec stop_spider(module(), reason) :: result
@spec stop_spider(Crawly.spider(), reason) :: result
when reason: :itemcount_limit | :itemcount_timeout | atom(),
result:
:ok | {:error, :spider_not_running} | {:error, :spider_not_found}
Expand All @@ -119,25 +118,26 @@ defmodule Crawly.Engine do
GenServer.call(__MODULE__, :running_spiders)
end

@spec get_spider_info(module()) :: spider_info()
def get_spider_info(name) do
GenServer.call(__MODULE__, {:get_spider, name})
@spec get_spider_info(Crawly.spider()) :: spider_info() | nil
def get_spider_info(spider_name) do
GenServer.call(__MODULE__, {:get_spider, spider_name})
end

def refresh_spider_list() do
GenServer.cast(__MODULE__, :refresh_spider_list)
end

def start_link() do
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

@spec get_crawl_id(atom()) :: {:error, :spider_not_running} | {:ok, binary()}
@spec get_crawl_id(Crawly.spider()) ::
{:error, :spider_not_running} | {:ok, binary()}
def get_crawl_id(spider_name) do
GenServer.call(__MODULE__, {:get_crawl_id, spider_name})
end

@spec init(any) :: {:ok, __MODULE__.t()}
@spec init(any) :: {:ok, t()}
def init(_args) do
spiders = get_updated_known_spider_list()

Expand Down Expand Up @@ -175,7 +175,8 @@ defmodule Crawly.Engine do
end

def handle_call(:list_known_spiders, _from, state) do
{:reply, format_spider_info(state), state}
return = Enum.map(state.known_spiders, &format_spider_info(&1, state))
{:reply, return, state}
end

def handle_call(
Expand Down Expand Up @@ -227,22 +228,29 @@ defmodule Crawly.Engine do
{:reply, msg, %Crawly.Engine{state | started_spiders: new_started_spiders}}
end

def handle_call({:get_spider, spider_name}, _from, state) do
return =
if Enum.member?(state.known_spiders, spider_name) do
format_spider_info(spider_name, state)
end

{:reply, return, state}
end

def handle_cast(:refresh_spider_list, state) do
updated = get_updated_known_spider_list(state.known_spiders)
{:noreply, %Crawly.Engine{state | known_spiders: updated}}
end

# this function generates a spider_info map for each spider known
defp format_spider_info(state) do
Enum.map(state.known_spiders, fn s ->
pid = Map.get(state.started_spiders, s)

%{
name: s,
status: if(is_nil(pid), do: :stopped, else: :started),
pid: pid
}
end)
defp format_spider_info(spider_name, state) do
pid = Map.get(state.started_spiders, spider_name)

%{
name: spider_name,
status: if(is_nil(pid), do: :stopped, else: :started),
pid: pid
}
end

defp get_updated_known_spider_list(known \\ []) do
Expand Down
23 changes: 5 additions & 18 deletions lib/crawly/engine_sup.ex
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
defmodule Crawly.EngineSup do
# Engine supervisor responsible for spider subtrees
@moduledoc false
use DynamicSupervisor

def start_link do
DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end

def init(:ok) do
DynamicSupervisor.init(strategy: :one_for_one)
end

def start_spider(spider_name, options) do
result =
case Code.ensure_loaded?(spider_name) do
true ->
# Given spider module exists in the namespace, we can proceed
{:ok, _sup_pid} =
DynamicSupervisor.start_child(
__MODULE__,
{Crawly.ManagerSup, [spider_name, options]}
)
DynamicSupervisor.start_child(
__MODULE__,
{Crawly.ManagerSup, [spider_name, options]}
)

false ->
{:error, "Spider: #{inspect(spider_name)} was not defined"}
Expand All @@ -30,9 +20,6 @@ defmodule Crawly.EngineSup do
end

def stop_spider(pid) do
DynamicSupervisor.terminate_child(
__MODULE__,
pid
)
DynamicSupervisor.terminate_child(__MODULE__, pid)
end
end
21 changes: 6 additions & 15 deletions lib/crawly/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ defmodule Crawly.Manager do
end

def start_link([spider_name, options]) do
Logger.debug("Starting the manager for #{spider_name}")
Logger.debug("Starting the manager for #{inspect(spider_name)}")
GenServer.start_link(__MODULE__, [spider_name, options])
end

Expand Down Expand Up @@ -105,7 +105,7 @@ defmodule Crawly.Manager do
tref = Process.send_after(self(), :operations, timeout)

Logger.debug(
"Started #{Enum.count(worker_pids)} workers for #{spider_name}"
"Started #{Enum.count(worker_pids)} workers for #{inspect(spider_name)}"
)

{:ok,
Expand Down Expand Up @@ -136,21 +136,21 @@ defmodule Crawly.Manager do

# Split start requests, so it's possible to initialize a part of them in async
# manner
{start_req, async_start_req} =
{start_reqs, async_start_reqs} =
Enum.split(start_requests, @start_request_split_size)

:ok = do_store_requests(state.name, start_req)
:ok = Crawly.RequestsStorage.store(state.name, start_reqs)

Task.start(fn ->
do_store_requests(state.name, async_start_req)
Crawly.RequestsStorage.store(state.name, async_start_reqs)
end)

{:noreply, state}
end

@impl true
def handle_cast({:add_workers, num_of_workers}, state) do
Logger.info("Adding #{num_of_workers} workers for #{state.name}")
Logger.info("Adding #{num_of_workers} workers for #{inspect(state.name)}")

Enum.each(1..num_of_workers, fn _ ->
DynamicSupervisor.start_child(
Expand Down Expand Up @@ -227,15 +227,6 @@ defmodule Crawly.Manager do

defp maybe_convert_to_integer(value) when is_integer(value), do: value

defp do_store_requests(spider_name, requests) do
Enum.each(
requests,
fn request ->
Crawly.RequestsStorage.store(spider_name, request)
end
)
end

# Get a closespider_itemcount or closespider_timeout_limit from config or spider
# settings.
defp get_default_limit(limit_name, spider_name) do
Expand Down
16 changes: 4 additions & 12 deletions lib/crawly/pipelines/write_to_file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,7 @@ defmodule Crawly.Pipelines.WriteToFile do
{:ok, io_device} =
File.open(
Path.join([folder, filename]),
[
:binary,
:write,
:delayed_write,
:utf8
]
[:binary, :write, :delayed_write, :utf8]
)

io_device
Expand All @@ -116,13 +111,10 @@ defmodule Crawly.Pipelines.WriteToFile do
IO.write(io, "\n")
catch
error, reason ->
stacktrace = :erlang.get_stacktrace()

Logger.error(
"Could not write item: #{inspect(error)}, reason: #{inspect(reason)}, stacktrace: #{
inspect(stacktrace)
}
"
"Could not write item: #{inspect(item)} to io: #{inspect(io)}\n#{
Exception.format(error, reason, __STACKTRACE__)
}"
)
end
end
Expand Down
49 changes: 18 additions & 31 deletions lib/crawly/requests_storage/requests_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,20 @@ defmodule Crawly.RequestsStorage do

alias Crawly.RequestsStorage

@doc """
Store requests in related child worker
"""
@spec store(spider_name, requests) :: result
when spider_name: atom(),
requests: [Crawly.Request.t()],
result: :ok | {:error, :storage_worker_not_running}
def store(spider_name, requests) when is_list(requests) do
GenServer.call(__MODULE__, {:store, {spider_name, requests}})
end
@batch_call_max_count 50

@doc """
Store request in related child worker
Store individual request or multiple requests in related child worker
"""
@spec store(spider_name, request) :: :ok
when spider_name: atom(),
request: Crawly.Request.t()
def store(spider_name, %Crawly.Request{} = request) do
store(spider_name, [request])
@spec store(Crawly.spider(), Crawly.Request.t() | [Crawly.Request.t()]) ::
:ok | {:error, :storage_worker_not_running}
def store(spider_name, %Crawly.Request{} = request),
do: GenServer.call(__MODULE__, {:store, {spider_name, [request]}})

def store(spider_name, requests) when is_list(requests) do
requests
|> Stream.chunk_every(@batch_call_max_count)
|> Enum.each(&GenServer.call(__MODULE__, {:store, {spider_name, &1}}))
end

def store(_spider_name, request) do
Expand All @@ -61,35 +56,27 @@ defmodule Crawly.RequestsStorage do
@doc """
Pop a request out of requests storage
"""
@spec pop(spider_name) :: result
when spider_name: atom(),
result:
nil
| Crawly.Request.t()
| {:error, :storage_worker_not_running}
@spec pop(Crawly.spider()) ::
nil | Crawly.Request.t() | {:error, :storage_worker_not_running}
def pop(spider_name) do
GenServer.call(__MODULE__, {:pop, spider_name})
end

@doc """
Get statistics from the requests storage
"""
@spec stats(spider_name) :: result
when spider_name: atom(),
result:
{:stored_requests, non_neg_integer()}
| {:error, :storage_worker_not_running}
@spec stats(Crawly.spider()) ::
{:stored_requests, non_neg_integer()}
| {:error, :storage_worker_not_running}
def stats(spider_name) do
GenServer.call(__MODULE__, {:stats, spider_name})
end

@doc """
Starts a worker for a given spider
"""
@spec start_worker(spider_name, crawl_id) :: result
when spider_name: atom(),
crawl_id: String.t(),
result: {:ok, pid()} | {:error, :already_started}
@spec start_worker(Crawly.spider(), crawl_id :: String.t()) ::
{:ok, pid()} | {:error, :already_started}
def start_worker(spider_name, crawl_id) do
GenServer.call(__MODULE__, {:start_worker, spider_name, crawl_id})
end
Expand Down
Loading

0 comments on commit 0a28302

Please sign in to comment.