Skip to content

Commit

Permalink
refactor: encapsulate binary construction into utils
Browse files Browse the repository at this point in the history
  • Loading branch information
heywhy committed May 2, 2024
1 parent cb87634 commit db532b1
Show file tree
Hide file tree
Showing 30 changed files with 538 additions and 399 deletions.
51 changes: 26 additions & 25 deletions lib/elasticlunr/bloom/stackable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Elasticlunr.Bloom.Stackable do
"""

alias Elasticlunr.Bloom
alias Elasticlunr.Encoding

defstruct [:capacity, :count, :fp_rate, :expansion, :bloom_filters]

Expand Down Expand Up @@ -80,46 +81,46 @@ defmodule Elasticlunr.Bloom.Stackable do
expansion: expansion
}) do
metadata =
<<capacity::unsigned-integer-size(64), fp_rate::unsigned-float, expansion::unsigned-integer,
count::unsigned-integer-size(64)>>
[]
|> Encoding.put_int64(capacity)
|> Encoding.put_float(fp_rate)
|> Encoding.put_int(expansion)
|> Encoding.put_int64(count)

bfs
|> Enum.map(fn bloom_filter ->
data = Bloom.serialize(bloom_filter)
size = byte_size(data)

<<size::unsigned-integer-size(32), data::binary>>
bloom_filter
|> Bloom.serialize()
|> then(&Encoding.put_size_prefixed([], &1))
end)
|> then(&Enum.concat([metadata], &1))
end

@spec decode(binary()) :: {:ok, t()}
def decode(binary) when is_binary(binary) do
with {:ok, opts, binary} when is_list(opts) <- read_metadata(binary),
bloom_filters when is_list(bloom_filters) <- read_filters(binary) do
{:ok, struct!(__MODULE__, [bloom_filters: bloom_filters] ++ opts)}
end
@spec decode!(binary()) :: t() | no_return()
def decode!(binary) when is_binary(binary) do
{opts, binary} = read_metadata(binary)
bloom_filters = read_filters(binary)

struct!(__MODULE__, [bloom_filters: bloom_filters] ++ opts)
end

defp read_metadata(binary) do
with <<capacity::unsigned-integer-size(64), binary::binary>> <- binary,
<<fp_rate::unsigned-float, binary::binary>> <- binary,
<<expansion::unsigned-integer, binary::binary>> <- binary,
<<count::unsigned-integer-size(64), binary::binary>> <- binary do
{:ok, [fp_rate: fp_rate, capacity: capacity, count: count, expansion: expansion], binary}
else
_ -> {:error, :bloom_filter_corruption}
end
{capacity, binary} = Encoding.chop_int64!(binary)
{fp_rate, binary} = Encoding.chop_float!(binary)
{expansion, binary} = Encoding.chop_int!(binary)
{count, binary} = Encoding.chop_int64!(binary)

{[fp_rate: fp_rate, capacity: capacity, count: count, expansion: expansion], binary}
end

defp read_filters(binary, acc \\ [])
defp read_filters(<<>>, acc), do: Enum.reverse(acc)

defp read_filters(binary, acc) do
with <<size::unsigned-integer-size(32), binary::binary>> <- binary,
<<data::binary-size(size), binary::binary>> <- binary,
bloom_filter <- Bloom.deserialize(data) do
read_filters(binary, [bloom_filter] ++ acc)
end
{data, binary} = Encoding.chop_size_prefixed!(binary)

data
|> Bloom.deserialize()
|> then(&read_filters(binary, [&1] ++ acc))
end
end
134 changes: 134 additions & 0 deletions lib/elasticlunr/encoding.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
defmodule Elasticlunr.Encoding do
@spec put_boolean(iodata(), boolean()) :: iodata()
def put_boolean(data, bool) when is_list(data) and is_boolean(bool) do
case bool do
true -> data ++ [<<1>>]
false -> data ++ [<<0>>]
end
end

@spec put_float(iodata(), float()) :: iodata()
def put_float(data, number) when is_list(data) do
data ++ [<<number::float>>]
end

@spec put_float64(iodata(), float()) :: iodata()
def put_float64(data, number) when is_list(data) and is_float(number) do
data ++ [<<number::float-size(8)-unit(8)>>]
end

@spec put_int(iodata(), integer()) :: iodata()
def put_int(data, number) when is_list(data) do
data ++ [number]
end

@spec put_int32(iodata(), integer()) :: iodata()
def put_int32(data, number) when is_list(data) and is_integer(number) do
data ++ [<<number::integer-size(4)-unit(8)>>]
end

@spec put_int64(iodata(), integer()) :: iodata()
def put_int64(data, number) when is_list(data) and is_integer(number) do
data ++ [<<number::integer-size(8)-unit(8)>>]
end

@type size :: :big | :tiny

@spec put_size_prefixed(iodata(), iodata(), size()) :: iodata()
def put_size_prefixed(data, binary, size \\ :big)

def put_size_prefixed(data, content, :big) when is_list(data) do
size = IO.iodata_length(content)

data
|> put_int64(size)
|> then(&(&1 ++ [content]))
end

def put_size_prefixed(data, content, :tiny) when is_list(data) do
size = IO.iodata_length(content)

data
|> put_int(size)
|> then(&(&1 ++ [content]))
end

@spec get_boolean!(binary() | IO.device()) :: boolean() | IO.nodata() | no_return()
def get_boolean!(<<1>>), do: true
def get_boolean!(<<0>>), do: false

def get_boolean!(device) when is_pid(device) do
device
|> IO.binread(1)
|> get_boolean!()
end

@spec get_int!(IO.device()) :: integer()
def get_int!(device) when is_pid(device) do
device
|> IO.binread(1)
|> then(&(<<_::unsigned-integer>> = &1))
|> then(&:binary.decode_unsigned/1)
end

@spec get_int64!(binary() | IO.device()) :: integer() | IO.nodata() | no_return()
def get_int64!(<<number::integer-size(8)-unit(8)>>), do: number

def get_int64!(device) when is_pid(device) do
device
|> IO.binread(8)
|> get_int64!()
end

@spec get_size_prefixed!(IO.device()) :: binary() | no_return()
def get_size_prefixed!(device) when is_pid(device) do
device
|> get_int64!()
|> then(&IO.binread(device, &1))
|> then(&(<<_::binary>> = &1))
end

@spec chop_boolean!(binary()) :: {boolean(), binary()} | no_return()
def chop_boolean!(<<1, binary::binary>>), do: {true, binary}
def chop_boolean!(<<0, binary::binary>>), do: {false, binary}

@spec chop_float!(binary()) :: {float(), binary()} | no_return()
def chop_float!(<<number::float, rest::binary>>), do: {number, rest}

@spec chop_float64!(binary()) :: {float(), binary()} | no_return()
def chop_float64!(<<number::float-size(8)-unit(8), rest::binary>>) do
{number, rest}
end

@spec chop_int!(binary()) :: {integer(), binary()} | no_return()
def chop_int!(<<number::integer-size(1)-unit(8), rest::binary>>) do
{number, rest}
end

@spec chop_int32!(binary()) :: {integer(), binary()} | no_return()
def chop_int32!(<<number::integer-size(4)-unit(8), rest::binary>>) do
{number, rest}
end

@spec chop_int64!(binary()) :: {integer(), binary()} | no_return()
def chop_int64!(<<number::integer-size(8)-unit(8), rest::binary>>) do
{number, rest}
end

@spec chop_size_prefixed!(binary(), size()) :: {binary(), binary()} | no_return()
def chop_size_prefixed!(binary, size \\ :big)

def chop_size_prefixed!(binary, :big) do
{size, binary} = chop_int64!(binary)
<<value::binary-size(size), binary::binary>> = binary

{value, binary}
end

def chop_size_prefixed!(binary, :tiny) do
{size, binary} = chop_int!(binary)
<<value::binary-size(size), binary::binary>> = binary

{value, binary}
end
end
3 changes: 1 addition & 2 deletions lib/elasticlunr/field.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ defmodule Elasticlunr.Field do
@enforce_keys [:name, :type]
defstruct [:name, :type]

@type literal :: :uid | :date | :number | :text
@type type :: literal() | {:array, literal()}
@type type :: :uid | :date | :number | :text | :array
@type document :: %{id: binary(), content: binary() | number() | Date.t()}

@type t :: %__MODULE__{
Expand Down
8 changes: 4 additions & 4 deletions lib/elasticlunr/index/reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ defmodule Elasticlunr.Index.Reader do
end
end

@spec get(t(), String.t()) :: map() | nil
def get(%__MODULE__{schema: schema, segments: segments}, id) do
@spec get!(t(), String.t()) :: map() | nil | no_return()
def get!(%__MODULE__{schema: schema, segments: segments}, id) do
id = Utils.id_from_string(id)

segments
|> Enum.filter(&SSTable.contains?(&1, id))
|> Task.async_stream(&SSTable.get(&1, id))
|> Task.async_stream(&SSTable.get!(&1, id))
|> Stream.map(fn {:ok, entry} -> entry end)
# reject nil values in case of false positive by the bloom filter
|> Stream.reject(&is_nil/1)
Expand All @@ -65,7 +65,7 @@ defmodule Elasticlunr.Index.Reader do

defp entry_to_document(%Entry{key: key, value: value}, schema) do
schema
|> Schema.binary_to_document(value)
|> Schema.decode!(value)
|> Map.put(:id, Utils.id_to_string(key))
end
end
6 changes: 3 additions & 3 deletions lib/elasticlunr/index/writer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ defmodule Elasticlunr.Index.Writer do

dir
|> Filename.log(log_number)
|> Iterator.new()
|> Iterator.new!()
|> Enum.reduce_while(params, fn entry, acc ->
%{mem_table: mt, compactions: c, manifest: manifest, mt_max_size: mms} = acc
mt = update_mt.(mt, entry)
Expand Down Expand Up @@ -331,7 +331,7 @@ defmodule Elasticlunr.Index.Writer do
def get(%__MODULE__{mem_table: mem_table, schema: schema}, id) do
with id <- Utils.id_from_string(id),
%MemTableEntry{deleted: false, value: value} <- MemTable.get(mem_table, id),
value <- Schema.binary_to_document(schema, value) do
value <- Schema.decode!(schema, value) do
Map.put(value, :id, Utils.id_to_string(id))
else
%MemTableEntry{deleted: true} -> nil
Expand Down Expand Up @@ -383,7 +383,7 @@ defmodule Elasticlunr.Index.Writer do
|> Map.pop!(:id)

with timestamp <- Utils.now(),
value <- Schema.document_to_binary(schema, document),
value <- Schema.encode(schema, document),
mem_table <- MemTable.set(writer.mem_table, id, value, timestamp),
{:ok, wal} <- Wal.set(writer.wal, id, value, timestamp),
document <- Map.put(document, :id, Utils.id_to_string(id)) do
Expand Down
2 changes: 1 addition & 1 deletion lib/elasticlunr/manifest.ex
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ defmodule Elasticlunr.Manifest do
defp read_and_apply_changes(manifest, fd) do
with <<size::unsigned-integer>> <- IO.binread(fd, 1),
binary when is_binary(binary) <- IO.binread(fd, size),
%{} = changes <- Changes.decode(binary),
%{} = changes <- Changes.decode!(binary),
{:ok, %{manifest: manifest}} <- do_apply(manifest, changes) do
read_and_apply_changes(manifest, fd)
else
Expand Down
70 changes: 39 additions & 31 deletions lib/elasticlunr/manifest/changes.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Elasticlunr.Manifest.Changes do
alias Elasticlunr.Encoding
alias Elasticlunr.FileMeta

defstruct [:log_number, :next_file_number, new_files: []]
Expand Down Expand Up @@ -47,52 +48,59 @@ defmodule Elasticlunr.Manifest.Changes do
|> Enum.map(fn {key, value} -> encode_field(key, value) end)
end

@spec decode(binary()) :: t()
def decode(binary), do: do_decode(binary, %__MODULE__{})
@spec decode!(binary()) :: t() | no_return()
def decode!(binary), do: decode!(binary, %__MODULE__{})

defp do_decode(<<>>, changes), do: changes
defp decode!(<<>>, changes), do: changes

defp do_decode(<<@k_log_number, log_number::unsigned-integer-size(64), rest::binary>>, changes) do
do_decode(rest, %{changes | log_number: log_number})
end
defp decode!(binary, changes) do
{tag, binary} = Encoding.chop_int!(binary)

defp do_decode(
<<@k_next_file_number, next_file_number::unsigned-integer-size(64), rest::binary>>,
changes
) do
do_decode(rest, %{changes | next_file_number: next_file_number})
end
case tag do
@k_log_number ->
{log_number, binary} = Encoding.chop_int64!(binary)
decode!(binary, %{changes | log_number: log_number})

defp do_decode(
<<@k_new_file, level::unsigned-integer, number::unsigned-integer-size(64),
size::unsigned-integer-size(64), sk_size::unsigned-integer-size(8 * 4),
sk::binary-size(sk_size), lk_size::unsigned-integer-size(8 * 4),
lk::binary-size(lk_size), rest::binary>>,
changes
) do
file_meta = %FileMeta{number: number, smallest_key: sk, largest_key: lk, size: size}
@k_next_file_number ->
{next_file_number, binary} = Encoding.chop_int64!(binary)
decode!(binary, %{changes | next_file_number: next_file_number})

changes
|> add_file(file_meta, level)
|> then(&do_decode(rest, &1))
@k_new_file ->
{level, binary} = Encoding.chop_int!(binary)
{number, binary} = Encoding.chop_int64!(binary)
{size, binary} = Encoding.chop_int64!(binary)
{sk, binary} = Encoding.chop_size_prefixed!(binary)
{lk, binary} = Encoding.chop_size_prefixed!(binary)

file_meta = %FileMeta{number: number, smallest_key: sk, largest_key: lk, size: size}

changes
|> add_file(file_meta, level)
|> then(&decode!(binary, &1))
end
end

defp encode_field(:log_number, value) do
<<@k_log_number::unsigned-integer, value::unsigned-integer-size(64)>>
[]
|> Encoding.put_int(@k_log_number)
|> Encoding.put_int64(value)
end

defp encode_field(:next_file_number, value) do
<<@k_next_file_number::unsigned-integer, value::unsigned-integer-size(64)>>
[]
|> Encoding.put_int(@k_next_file_number)
|> Encoding.put_int64(value)
end

defp encode_field(:new_files, files) do
Enum.map(files, fn {level, %{number: number, size: size, largest_key: lk, smallest_key: sk}} ->
<<@k_new_file::unsigned-integer, level::unsigned-integer, number::unsigned-integer-size(64),
size::unsigned-integer-size(64), encode_key(sk)::binary, encode_key(lk)::binary>>
[]
|> Encoding.put_int(@k_new_file)
|> Encoding.put_int(level)
|> Encoding.put_int64(number)
|> Encoding.put_int64(size)
|> Encoding.put_size_prefixed(sk)
|> Encoding.put_size_prefixed(lk)
end)
end

defp encode_key(key) do
<<byte_size(key)::unsigned-integer-size(32), key::binary>>
end
end
Loading

0 comments on commit db532b1

Please sign in to comment.