Skip to content

Commit

Permalink
refactor: update the reader process
Browse files Browse the repository at this point in the history
  • Loading branch information
heywhy committed Apr 18, 2024
1 parent 1e453f6 commit 6eee8e3
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 88 deletions.
2 changes: 2 additions & 0 deletions lib/elasticlunr/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ defmodule Elasticlunr.Application do
use Application

alias Elasticlunr.Compaction
alias Elasticlunr.PubSub

@impl true
def start(_type, _args) do
children = [
PubSub,
{Registry, name: Elasticlunr.Fs, keys: :unique},
{Registry, name: Elasticlunr.IndexRegistry, keys: :unique},
{Task.Supervisor, name: Elasticlunr.FlushMemTableSupervisor},
Expand Down
7 changes: 7 additions & 0 deletions lib/elasticlunr/fs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ defmodule Elasticlunr.Fs do
fd
end

@spec db_files(Path.t()) :: [Path.t()]
def db_files(path) do
path
|> Path.join("*")
|> Path.wildcard()
end

# coveralls-ignore-start
@spec read(Path.t()) :: binary()
def read(path) do
Expand Down
41 changes: 16 additions & 25 deletions lib/elasticlunr/index/reader.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
defmodule Elasticlunr.Index.Reader do
alias Elasticlunr.Filename
alias Elasticlunr.FileMeta
alias Elasticlunr.Schema
alias Elasticlunr.SSTable
alias Elasticlunr.SSTable.Entry
Expand All @@ -25,33 +27,22 @@ defmodule Elasticlunr.Index.Reader do
struct!(__MODULE__, attrs)
end

defdelegate lockfile?(path), to: SSTable

def add_segment(%__MODULE__{segments: segments} = reader, path) when is_binary(path) do
segments
|> Enum.find(&(&1.path == path))
|> case do
%SSTable{} ->
reader

nil ->
ss_table = SSTable.from_path(path)
%{reader | segments: [ss_table] ++ segments}
end
end

@spec remove_segment(t(), Path.t()) :: t()
def remove_segment(%__MODULE__{segments: segments} = reader, path) when is_binary(path) do
segments
|> Enum.reject(&(&1.path == path))
|> then(&%{reader | segments: &1})
@spec loaded?(t(), FileMeta.t()) :: boolean()
def loaded?(%__MODULE__{segments: segments}, %FileMeta{dir: dir, number: number}) do
dir
|> Filename.ss_table(number)
|> then(&Enum.any?(segments, fn ss_table -> ss_table.path == &1 end))
end

@spec load_segments(Path.t()) :: [SSTable.t()]
def load_segments(dir) do
dir
|> SSTable.list()
|> Enum.map(&SSTable.from_path/1)
@spec add_segment(t(), FileMeta.t()) :: {:ok, t()} | {:error, File.posix()}
def add_segment(%__MODULE__{segments: segments} = reader, %FileMeta{} = file_meta) do
with false <- loaded?(reader, file_meta),
{:ok, ss_table} <- SSTable.from_path(file_meta) do
{:ok, %{reader | segments: [ss_table] ++ segments}}
else
true -> {:ok, reader}
error -> error
end
end

@spec get(t(), String.t()) :: map() | nil
Expand Down
13 changes: 5 additions & 8 deletions lib/elasticlunr/index/writer.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Elasticlunr.Index.Writer do
use Rop

alias Elasticlunr.Fs
alias Elasticlunr.Filename
alias Elasticlunr.Manifest
alias Elasticlunr.Manifest.Changes
Expand Down Expand Up @@ -62,7 +63,7 @@ defmodule Elasticlunr.Index.Writer do

files_to_delete =
dir
|> db_files()
|> Fs.db_files()
|> Enum.reduce([], fn path, acc ->
case keep?.(path) do
false -> [path] ++ acc
Expand Down Expand Up @@ -131,6 +132,7 @@ defmodule Elasticlunr.Index.Writer do
end

log_files
|> Enum.sort()
|> Enum.reduce_while(params, fn log_number, params ->
params
|> Map.put(:compactions, 0)
Expand Down Expand Up @@ -168,6 +170,7 @@ defmodule Elasticlunr.Index.Writer do
mt = update_mt.(mt, entry)

with {true, mt} <- {MemTable.size(mt) >= mms, mt},
# TODO: add sstable to manifest
:ok <- write_to_level_0(mt, dir) do

Check warning on line 174 in lib/elasticlunr/index/writer.ex

View workflow job for this annotation

GitHub Actions / analyze

call

The function call write_to_level_0 will not succeed.
{:cont, %{acc | mem_table: MemTable.new(), compactions: c + 1}}
else
Expand Down Expand Up @@ -198,7 +201,7 @@ defmodule Elasticlunr.Index.Writer do
known_files = Manifest.known_files(manifest)

dir
|> db_files()
|> Fs.db_files()
|> extract_log_files(known_files)
# TODO: log corruption error due to missing files
|> then(&elem(&1, 1))
Expand All @@ -222,12 +225,6 @@ defmodule Elasticlunr.Index.Writer do
end)
end

defp db_files(path) do
path
|> Path.join("*")
|> Path.wildcard()
end

defp recover_manifest(%{dir: dir} = state) do
path = Filename.current(dir)

Expand Down
41 changes: 41 additions & 0 deletions lib/elasticlunr/pub_sub.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule Elasticlunr.PubSub do
use GenServer

@spec subscribe(String.t()) :: :ok
def subscribe(stream) do
GenServer.call(__MODULE__, {:subscribe, stream, self()})
end

@spec publish(String.t(), atom(), nil | term()) :: :ok
def publish(stream, event, args \\ nil) do
GenServer.cast(__MODULE__, {:publish, stream, event, args})
end

@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl true
def init([]), do: {:ok, %{}}

@impl true
def handle_call({:subscribe, stream, pid}, _from, state) do
# TODO: link process so that it can be unsubscribe automatically when it shuts down.
state
|> Map.get(stream, MapSet.new())
|> MapSet.put(pid)
|> then(&Map.put(state, stream, &1))
|> then(&{:reply, :ok, &1})
end

@impl true
def handle_cast({:publish, stream, event, args}, state) do
state
|> Map.get(stream, MapSet.new())
|> MapSet.to_list()
|> Enum.each(&send(&1, {event, args}))

{:noreply, state}
end
end
93 changes: 63 additions & 30 deletions lib/elasticlunr/server/reader.ex
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
defmodule Elasticlunr.Server.Reader do
use GenServer
use Rop

alias Elasticlunr.PubSub
alias Elasticlunr.FileMeta
alias Elasticlunr.Filename
alias Elasticlunr.Fs
alias Elasticlunr.Index.Reader
alias Elasticlunr.Manifest
alias Elasticlunr.SSTable

require Logger

defstruct [:reader, :watcher]

@type t :: %__MODULE__{
reader: Reader.t(),
watcher: pid()
}

@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) do
opts = Keyword.validate!(opts, [:dir, :schema])
Expand All @@ -22,13 +23,17 @@ defmodule Elasticlunr.Server.Reader do

@impl true
def init(opts) do
with dir <- Keyword.fetch!(opts, :dir),
schema <- Keyword.fetch!(opts, :schema),
# TODO: pushing this action to handle_continue might improve performance
segments <- Reader.load_segments(dir),
watcher <- Fs.watch!(dir),
reader <- Reader.new(dir, schema, segments: segments) do
{:ok, %__MODULE__{reader: reader, watcher: watcher}}
dir = Keyword.fetch!(opts, :dir)
schema = Keyword.fetch!(opts, :schema)

:ok = PubSub.subscribe(schema.name)

read_manifest(%{dir: dir, schema: schema}) >>>
load_ss_tables() >>>
patch_reader()
|> case do
{:error, reason} -> {:stop, reason}
{:ok, reader} -> {:ok, %__MODULE__{reader: reader}}
end
end

Expand All @@ -42,32 +47,60 @@ defmodule Elasticlunr.Server.Reader do

@impl true
def handle_info(
{:file_event, watcher, {path, events}},
%__MODULE__{watcher: watcher, reader: reader} = state
{:add_file, %FileMeta{dir: dir, number: number} = file_meta},
%__MODULE__{reader: reader} = state
) do
with true <- Reader.lockfile?(path),
:create <- Fs.event_to_action(events),
path <- Path.dirname(path),
reader <- Reader.add_segment(reader, path) do
Logger.debug("Update reader with #{path}.")
file = Filename.ss_table(dir, number)

with true <- File.exists?(file),
{:ok, reader} <- Reader.add_segment(reader, file_meta) do
Logger.debug("Update reader with #{file}.")
{:noreply, %{state | reader: reader}}
else
false ->
{:noreply, state}
v when is_boolean(v) -> {:noreply, state}
end
end

:remove ->
path = Path.dirname(path)
defp read_manifest(%{dir: dir} = state) do
path = Filename.current(dir)

{:noreply, %{state | reader: Reader.remove_segment(reader, path)}}
with {:ok, content} <- File.read(path),
manifest_path = Path.join(dir, content),
{:ok, manifest} <- Manifest.from_path(manifest_path) do
{:ok, Map.put(state, :manifest, manifest)}
end
end

def handle_info(
{:file_event, watcher, :stop},
%__MODULE__{watcher: watcher, reader: reader} = state
) do
Logger.debug("Stop watching directory #{reader.dir}.")
# INFO: pushing this action to handle_continue might improve performance
defp load_ss_tables(%{dir: dir, manifest: manifest} = state) do
known_files = Manifest.known_files(manifest)

dir
|> Fs.db_files()
|> Enum.reduce_while([], fn path, acc ->
with {:sst, number} <- Filename.parse(path),
true <- MapSet.member?(known_files, number),
file_meta = %FileMeta{dir: dir, number: number},
{:ok, ss_table} <- SSTable.from_path(file_meta) do
{:cont, [ss_table] ++ acc}
else
{:error, _reason} = error -> {:halt, error}
_ -> {:cont, acc}
end
end)
|> case do
ss_tables when is_list(ss_tables) ->
state
|> Map.put(:ss_tables, ss_tables)
|> then(&{:ok, &1})

error ->
error
end
end

{:noreply, state}
defp patch_reader(%{dir: dir, schema: schema, ss_tables: segments}) do
# TODO: close manifest since it opens a file handle
{:ok, Reader.new(dir, schema, segments: segments)}
end
end
22 changes: 11 additions & 11 deletions lib/elasticlunr/server/writer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Elasticlunr.Server.Writer do
alias Elasticlunr.Manifest
alias Elasticlunr.Manifest.Changes
alias Elasticlunr.{FlushMemTableSupervisor, SSTable, Wal}
alias Elasticlunr.PubSub

require Logger

Expand All @@ -24,16 +25,12 @@ defmodule Elasticlunr.Server.Writer do
schema = Keyword.fetch!(opts, :schema)
mt_max_size = Keyword.fetch!(opts, :mem_table_max_size)

writer = Writer.new(dir, schema, mt_max_size)

{:ok, %__MODULE__{writer: writer}, {:continue, :recover}}
end

@impl true
def handle_continue(:recover, %__MODULE__{writer: writer} = state) do
case Writer.recover(writer) do
{:ok, writer} -> {:noreply, %{state | writer: writer}}
{:error, reason} -> {:stop, reason, state}
dir
|> Writer.new(schema, mt_max_size)
|> Writer.recover()
|> case do
{:ok, writer} -> {:ok, %__MODULE__{writer: writer}}
{:error, reason} -> {:stop, reason}
end
end

Expand Down Expand Up @@ -82,7 +79,7 @@ defmodule Elasticlunr.Server.Writer do
end

def handle_info({:add_file, file_meta}, %__MODULE__{writer: writer} = state) do
%Writer{manifest: manifest} = writer
%Writer{manifest: manifest, schema: schema} = writer

%Changes{}
|> Changes.add_file(file_meta)
Expand All @@ -91,6 +88,8 @@ defmodule Elasticlunr.Server.Writer do
{:ok, manifest} ->
writer = %{writer | manifest: manifest}

:ok = PubSub.publish(schema.name, :add_file, file_meta)

{:noreply, %{state | writer: writer}}
end
end
Expand Down Expand Up @@ -161,6 +160,7 @@ defmodule Elasticlunr.Server.Writer do
# This steps should be encapsulated in the writer module but wasn't
# because of data copying from this server to the task process
with {:ok, file_meta} <- SSTable.flush(mem_table, file_meta),
# TODO: remove wal from manifest
:ok <- Wal.delete(wal),
true <- file_meta.size > 0 do
file_meta
Expand Down
15 changes: 1 addition & 14 deletions test/server/reader_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ defmodule Elasticlunr.Server.ReaderTest do
assert eventually(fn -> GenServer.call(pid, {:get, document.id}) end)
end

@tag skip: "testing should be all about compacting sstables"
test "update internals when a segment is deleted", %{
dir: dir,
pid: pid,
Expand All @@ -69,7 +70,6 @@ defmodule Elasticlunr.Server.ReaderTest do

assert eventually(fn -> GenServer.call(pid, {:get, document.id}) end)
assert Enum.each(ss_tables, &File.rm_rf!/1)
assert wait_for_lockfile_event()
assert_received {:remove_lockfile, _dir, _path}
assert eventually(fn -> GenServer.call(pid, {:get, document.id}) == nil end)
end
Expand All @@ -83,17 +83,4 @@ defmodule Elasticlunr.Server.ReaderTest do
assert entry = eventually(fn -> GenServer.call(pid, {:get, document.id}) end)
assert entry.id == document.id
end

defp wait_for_lockfile_event do
receive do
{:file_event, _watcher, {path, events}} ->
path
|> SSTable.lockfile?()
|> Kernel.and(Fs.event_to_action(events) == :remove)
|> case do
false -> wait_for_lockfile_event()
true -> send(self(), {:remove_lockfile, Path.dirname(path), path})
end
end
end
end

0 comments on commit 6eee8e3

Please sign in to comment.