Permalink
Browse files

Speed up and refactor data loading

  • Loading branch information...
josevalim committed Sep 27, 2018
1 parent 0b52e54 commit c34046be0317903785d5f7f3ece35ac6e0f64008
Showing with 204 additions and 172 deletions.
  1. +7 −9 lib/ecto/repo/queryable.ex
  2. +5 −5 lib/ecto/repo/schema.ex
  3. +9 −157 lib/ecto/schema.ex
  4. +120 −0 lib/ecto/schema/loader.ex
  5. +62 −0 lib/ecto/schema/metadata.ex
  6. +1 −1 lib/ecto/type.ex
View
@@ -186,9 +186,11 @@ defmodule Ecto.Repo.Queryable do
end
end
defp preprocessor({_, {:source, source_schema, prefix, fields}}, preprocess, adapter) do
defp preprocessor({_, {:source, {source, schema}, prefix, types}}, preprocess, adapter) do
struct = Ecto.Schema.Loader.load_struct(schema, prefix, source)
fn row ->
{entry, rest} = process_source(source_schema, fields, row, false, prefix, adapter)
{entry, rest} = Ecto.Schema.Loader.adapter_load(struct, types, row, false, adapter)
preprocess(rest, preprocess, entry, adapter)
end
end
@@ -229,8 +231,9 @@ defmodule Ecto.Repo.Queryable do
defp process(row, {:source, :from}, from, _adapter) do
{from, row}
end
defp process(row, {:source, source_schema, prefix, fields}, _from, adapter) do
process_source(source_schema, fields, row, true, prefix, adapter)
defp process(row, {:source, {source, schema}, prefix, types}, _from, adapter) do
struct = Ecto.Schema.Loader.load_struct(schema, prefix, source)
Ecto.Schema.Loader.adapter_load(struct, types, row, true, adapter)
end
defp process(row, {:merge, left, right}, from, adapter) do
{left, row} = process(row, left, from, adapter)
@@ -306,11 +309,6 @@ defmodule Ecto.Repo.Queryable do
{data, row}
end
defp process_source({source, schema}, types, row, all_nil?, prefix, adapter) do
struct = if schema, do: schema.__struct__(), else: %{}
Ecto.Schema.__adapter_load__(struct, types, row, adapter, prefix, source, all_nil?)
end
defp process_args(args, row, from, adapter) do
Enum.map_reduce(args, row, fn arg, row ->
process(row, arg, from, adapter)
View
@@ -67,11 +67,11 @@ defmodule Ecto.Repo.Schema do
for row <- rows, do: Map.new(Enum.zip(fields, row))
end
defp postprocess(rows, types, adapter, schema, %{source: source, prefix: prefix}) do
struct = schema.__struct__()
defp postprocess(rows, types, adapter, schema, %{prefix: prefix, source: source}) do
struct = Ecto.Schema.Loader.load_struct(schema, prefix, source)
for row <- rows do
{loaded, _} = Ecto.Schema.__adapter_load__(struct, types, row, adapter, prefix, source, false)
{loaded, _} = Ecto.Schema.Loader.adapter_load(struct, types, row, false, adapter)
loaded
end
end
@@ -430,9 +430,9 @@ defmodule Ecto.Repo.Schema do
defp do_load(schema, {fields, values}, loader) when is_list(fields) and is_list(values),
do: do_load(schema, Enum.zip(fields, values), loader)
defp do_load(schema, data, loader) when is_atom(schema),
do: Ecto.Schema.__unsafe_load__(schema, data, loader)
do: Ecto.Schema.Loader.unsafe_load(schema, data, loader)
defp do_load(types, data, loader) when is_map(types),
do: Ecto.Schema.__unsafe_load__(%{}, types, data, loader)
do: Ecto.Schema.Loader.unsafe_load(%{}, types, data, loader)
## Helpers
View
@@ -399,73 +399,14 @@ defmodule Ecto.Schema do
defined so structs and changeset functionalities are available.
"""
alias Ecto.Schema.Metadata
@type source :: String.t
@type prefix :: String.t | nil
@type schema :: %{optional(atom) => any, __struct__: atom, __meta__: Ecto.Schema.Metadata.t}
@type schema :: %{optional(atom) => any, __struct__: atom, __meta__: Metadata.t}
@type embedded_schema :: %{optional(atom) => any, __struct__: atom}
@type t :: schema | embedded_schema
defmodule Metadata do
@moduledoc """
Stores metadata of a struct.
## State
The state of the schema is stored in the `:state` field and allows
following values:
* `:built` - the struct was constructed in memory and is not persisted
to database yet;
* `:loaded` - the struct was loaded from database and represents
persisted data;
* `:deleted` - the struct was deleted and no longer represents persisted
data.
## Source
The `:source` tracks the (table or collection) where the struct is or should
be persisted to.
## Prefix
Tracks the source prefix in the data storage.
## Context
The `:context` field represents additional state some databases require
for proper updates of data. It is not used by the built-in adapters of
`Ecto.Adapters.Postres` and `Ecto.Adapters.MySQL`.
## Schema
The `:schema` field refers the module name for the schema this metadata belongs to.
"""
defstruct [:state, :source, :context, :schema, :prefix]
@type state :: :built | :loaded | :deleted
@type t :: %__MODULE__{
context: any,
prefix: Ecto.Schema.prefix,
schema: module,
source: Ecto.Schema.source,
state: state,
}
defimpl Inspect do
import Inspect.Algebra
def inspect(metadata, opts) do
%{source: source, prefix: prefix, state: state, context: context} = metadata
entries =
for entry <- [state, prefix, source, context],
entry != nil,
do: to_doc(entry, opts)
concat ["#Ecto.Schema.Metadata<"] ++ Enum.intersperse(entries, ", ") ++ [">"]
end
end
end
@doc false
defmacro __using__(_) do
quote do
@@ -573,6 +514,7 @@ defmodule Ecto.Schema do
field_sources = @ecto_field_sources |> Enum.reverse
assocs = @ecto_assocs |> Enum.reverse
embeds = @ecto_embeds |> Enum.reverse
loaded = Ecto.Schema.__loaded__(__MODULE__, @struct_fields)
defstruct @struct_fields
@@ -589,6 +531,7 @@ defmodule Ecto.Schema do
def __schema__(:autogenerate_id), do: unquote(Macro.escape(@ecto_autogenerate_id))
def __schema__(:autogenerate), do: unquote(Macro.escape(autogenerate))
def __schema__(:autoupdate), do: unquote(Macro.escape(autoupdate))
def __schema__(:loaded), do: unquote(Macro.escape(loaded))
def __schema__(:query) do
%Ecto.Query{
@@ -1819,104 +1762,13 @@ defmodule Ecto.Schema do
end
@doc false
# Loads data into struct by assumes fields are properly
# named and belongs to the struct. Types and values are
# zipped together in one pass as they are loaded.
def __adapter_load__(struct, types, values, adapter, prefix, source, all_nil?) do
case adapter_load_zip(types, values, [], all_nil?, struct, adapter) do
{nil, rest} ->
{nil, rest}
{zipped, rest} ->
case Map.merge(struct, zipped) do
%{__meta__: %Metadata{} = metadata} = struct ->
metadata = %{metadata | state: :loaded, source: source, prefix: prefix}
{Map.put(struct, :__meta__, metadata), rest}
map ->
{map, rest}
end
end
end
defp adapter_load_zip([{field, type} | types], [value | values], acc, all_nil?, struct, adapter) do
all_nil? = all_nil? and value == nil
value = adapter_load!(struct, field, type, value, adapter)
adapter_load_zip(types, values, [{field, value} | acc], all_nil?, struct, adapter)
end
defp adapter_load_zip([], values, _acc, true, _struct, _adapter) do
{nil, values}
end
defp adapter_load_zip([], values, acc, false, _struct, _adapter) do
{Map.new(acc), values}
end
@doc false
# Assumes data does not all belongs to schema/struct
# and that it may also require source-based renaming.
def __unsafe_load__(schema, data, loader) do
types = schema.__schema__(:load)
struct = schema.__struct__()
case __unsafe_load__(struct, types, data, loader) do
%{__meta__: %Metadata{} = metadata} = struct ->
Map.put(struct, :__meta__, %{metadata | state: :loaded})
map ->
map
def __loaded__(module, struct_fields) do
case Map.new([{:__struct__, module} | struct_fields]) do
%{__meta__: meta} = struct -> %{struct | __meta__: Map.put(meta, :state, :loaded)}
struct -> struct
end
end
@doc false
def __unsafe_load__(struct, types, map, loader) when is_map(map) do
Enum.reduce(types, struct, fn pair, acc ->
{field, source, type} = field_source_and_type(pair)
case fetch_string_or_atom_field(map, source) do
{:ok, value} -> Map.put(acc, field, load!(struct, field, type, value, loader))
:error -> acc
end
end)
end
@compile {:inline, field_source_and_type: 1, fetch_string_or_atom_field: 2}
defp field_source_and_type({field, {:source, source, type}}) do
{field, source, type}
end
defp field_source_and_type({field, type}) do
{field, field, type}
end
defp fetch_string_or_atom_field(map, field) when is_atom(field) do
case Map.fetch(map, Atom.to_string(field)) do
{:ok, value} -> {:ok, value}
:error -> Map.fetch(map, field)
end
end
defp adapter_load!(struct, field, type, value, adapter) do
case Ecto.Type.adapter_load(adapter, type, value) do
{:ok, value} -> value
:error -> bad_load!(field, type, value, struct)
end
end
defp load!(struct, field, type, value, loader) do
case loader.(type, value) do
{:ok, value} -> value
:error -> bad_load!(field, type, value, struct)
end
end
defp bad_load!(field, type, value, struct) do
raise ArgumentError, "cannot load `#{inspect value}` as type #{inspect type} " <>
"for field `#{field}`#{error_data(struct)}"
end
defp error_data(%{__struct__: atom}) do
" in schema #{inspect atom}"
end
defp error_data(other) when is_map(other) do
""
end
@doc false
def __field__(mod, name, type, opts) do
check_field_type!(name, type, opts)
View
@@ -0,0 +1,120 @@
defmodule Ecto.Schema.Loader do
@moduledoc false
alias Ecto.Schema.Metadata
@doc """
Loads a struct to be used as a template in further operations.
"""
def load_struct(nil, _prefix, _source), do: %{}
def load_struct(schema, prefix, source) do
case schema.__schema__(:loaded) do
%{__meta__: %Metadata{prefix: ^prefix, source: ^source}} = struct ->
struct
%{__meta__: %Metadata{} = metadata} = struct ->
Map.put(struct, :__meta__, %{metadata | source: source, prefix: prefix})
%{} = struct ->
struct
end
end
@doc """
Loads data into struct by assumes fields are properly
named and belongs to the struct. Types and values are
zipped together in one pass as they are loaded.
"""
def adapter_load(struct, types, values, all_nil?, adapter) do
adapter_load(types, values, [], all_nil?, struct, adapter)
end
defp adapter_load([{field, type} | types], [value | values], acc, all_nil?, struct, adapter) do
all_nil? = all_nil? and value == nil
value = adapter_load!(struct, field, type, value, adapter)
adapter_load(types, values, [{field, value} | acc], all_nil?, struct, adapter)
end
defp adapter_load([], values, _acc, true, _struct, _adapter) do
{nil, values}
end
defp adapter_load([], values, acc, false, struct, _adapter) do
{Map.merge(struct, Map.new(acc)), values}
end
@doc """
Loads data coming from the user/embeds into schema.
Assumes data does not all belongs to schema/struct
and that it may also require source-based renaming.
"""
def unsafe_load(schema, data, loader) do
types = schema.__schema__(:load)
struct = schema.__schema__(:loaded)
unsafe_load(struct, types, data, loader)
end
@doc """
Loads data coming from the user/embeds into struct and types.
Assumes data does not all belongs to schema/struct
and that it may also require source-based renaming.
"""
def unsafe_load(struct, types, map, loader) when is_map(map) do
Enum.reduce(types, struct, fn pair, acc ->
{field, source, type} = field_source_and_type(pair)
case fetch_string_or_atom_field(map, source) do
{:ok, value} -> Map.put(acc, field, load!(struct, field, type, value, loader))
:error -> acc
end
end)
end
@compile {:inline, field_source_and_type: 1, fetch_string_or_atom_field: 2}
defp field_source_and_type({field, {:source, source, type}}) do
{field, source, type}
end
defp field_source_and_type({field, type}) do
{field, field, type}
end
defp fetch_string_or_atom_field(map, field) when is_atom(field) do
case Map.fetch(map, Atom.to_string(field)) do
{:ok, value} -> {:ok, value}
:error -> Map.fetch(map, field)
end
end
@compile {:inline, load!: 5, adapter_load!: 5}
defp adapter_load!(struct, field, type, value, adapter) do
case Ecto.Type.adapter_load(adapter, type, value) do
{:ok, value} -> value
:error -> bad_load!(field, type, value, struct)
end
end
defp load!(struct, field, type, value, loader) do
case loader.(type, value) do
{:ok, value} -> value
:error -> bad_load!(field, type, value, struct)
end
end
defp bad_load!(field, type, value, struct) do
raise ArgumentError,
"cannot load `#{inspect(value)}` as type #{inspect(type)} " <>
"for field `#{field}`#{error_data(struct)}"
end
defp error_data(%{__struct__: atom}) do
" in schema #{inspect(atom)}"
end
defp error_data(other) when is_map(other) do
""
end
end
Oops, something went wrong.

0 comments on commit c34046b

Please sign in to comment.