Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

All - Ecto.Adapters.SQL.Connection #40

Open
Danwhy opened this issue Feb 14, 2019 · 19 comments
Open

All - Ecto.Adapters.SQL.Connection #40

Danwhy opened this issue Feb 14, 2019 · 19 comments
Assignees
Labels
enhancement New feature or request in-progress

Comments

@Danwhy
Copy link
Member

Danwhy commented Feb 14, 2019

Part of #38

https://hexdocs.pm/ecto_sql/Ecto.Adapters.SQL.Connection.html#c:all/1

We need to implement this callback so that it returns the latest version of all rows that match the query parameter.

@RobStallion has already done some research on looking into how we can modify the passed in query, which we can then pass on to the Ecto.Adapters.Postgres.Connection.all function

RobStallion/alog_adapter#1
https://stackoverflow.com/questions/54690701/is-there-a-way-to-ensure-where-clause-happens-after-distinct

@Danwhy Danwhy added the enhancement New feature or request label Feb 14, 2019
@RobStallion
Copy link
Member

@Danwhy I just got the following error when trying to run a rollback command on the module where I have been testing this dummy adapter...

** (Postgrex.Error) ERROR 42703 (undefined_column) column s0.comment_id_no does not exist

    query: SELECT DISTINCT ON (s0."comment_id_no") s0."version"::bigint FROM "schema_migrations" AS s0 FOR UPDATE
    (ecto_sql) lib/ecto/adapters/sql.ex:624: Ecto.Adapters.SQL.raise_sql_call_error/1
    (ecto_sql) lib/ecto/adapters/sql.ex:557: Ecto.Adapters.SQL.execute/5
    (ecto) lib/ecto/repo/queryable.ex:147: Ecto.Repo.Queryable.execute/4
    (ecto) lib/ecto/repo/queryable.ex:18: Ecto.Repo.Queryable.all/3
    (ecto_sql) lib/ecto/migrator.ex:316: anonymous fn/3 in Ecto.Migrator.lock_for_migrations/3
    (ecto_sql) lib/ecto/adapters/sql.ex:820: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4
    (db_connection) lib/db_connection.ex:1355: DBConnection.run_transaction/4
    (ecto_sql) lib/ecto/adapters/sql.ex:727: Ecto.Adapters.SQL.lock_for_migrations/5
    (ecto_sql) lib/ecto/migrator.ex:318: Ecto.Migrator.lock_for_migrations/3
    (ecto_sql) lib/mix/tasks/ecto.rollback.ex:106: anonymous fn/4 in Mix.Tasks.Ecto.Rollback.run/2
    (elixir) lib/enum.ex:765: Enum."-each/2-lists^foreach/1-0-"/2
    (elixir) lib/enum.ex:765: Enum.each/2
    (mix) lib/mix/task.ex:316: Mix.Task.run_task/3
    (mix) lib/mix/cli.ex:79: Mix.CLI.run_task/2
    (elixir) lib/code.ex:767: Code.require_file/2

This is the line that is causing the issue - https://github.com/RobStallion/alog_adapter/blob/master/lib/connection.ex#L36

This appears to be because not all the tables created in the module contain the row "comment_id_no" (equivalent to entry_id).

This could be a potential issue in trying to create an ecto adapter.

@Danwhy
Copy link
Member Author

Danwhy commented Feb 15, 2019

@RobStallion Yeah I'd assume that the problem is that the schema_migrations table is automatically created. But we can have full control over all tables being created, so we can either make sure the "entry_id" is on all created tables, or have a clause that catches the exceptions.

@Danwhy
Copy link
Member Author

Danwhy commented Feb 19, 2019

I've been having a look into the flow of how the Repo.all query is called:

Repo.all -> Ecto.Repo.Queryable.all -> Ecto.Repo.Queryable.execute -> Ecto.Query.Planner.query (Ecto.Query.Planner.plan) -> adapter.execute (Ecto.Adapters.SQL.execute) -> adapter.execute! -> sql_call

It looks like the sources are added in the Ecto.Query.Planner.plan call, so we may be able to call this function ourselves to prepare the query in our all function

@SimonLab
Copy link
Member

SimonLab commented Feb 19, 2019

I've been doing something similar yesterday to see how the all function was created. I was a bit confuse by the execute function as the adapter has also one but with only 4 parameters and I figure out later one that it was the one used by Ecto.Adapters.SQL which is defined by 5 parameters.
see also dwyl/learn-elixir#116 to see what is a best way to have a trace of the function call

The sources looks to be used (but not defined there) in Ecto.Adapters.SQL.execute function with put_source:

  def execute(adapter_meta, query_meta, prepared, params, opts) do
    %{num_rows: num, rows: rows} =
      execute!(adapter_meta, prepared, params, put_source(opts, query_meta))

    {num, rows}
end

https://github.com/elixir-ecto/ecto_sql/blob/3a72eb111f8613c8eada296bcd3dd89883a52b5a/lib/ecto/adapters/sql.ex#L554-L560

@Danwhy
Copy link
Member Author

Danwhy commented Feb 19, 2019

I think by the time it gets to that function, our Adapter all function has already been called, that's just where the sql it generates is actually executed

@SimonLab SimonLab self-assigned this Feb 19, 2019
@SimonLab
Copy link
Member

SimonLab commented Feb 19, 2019

Describing the trace call for the Repo.all function to find out where exactly the all function from our adapter will be called and with which parameter structure

  • Repo.all(queryalbe, opts):

This function is defined in Ecto.Repo module on line 237: https://github.com/elixir-ecto/ecto/blob/2d564df57d29ef021f564af36e4b3ab86f902554/lib/ecto/repo.ex#L237-L239

 def all(queryable, opts \\ []) do
          Ecto.Repo.Queryable.all(__MODULE__, queryable, opts)
end
  def all(name, queryable, opts) when is_list(opts) do
    query =
      queryable
      |> Ecto.Queryable.to_query
      |> Ecto.Query.Planner.ensure_select(true)
      |> attach_prefix(opts)

    execute(:all, name, query, opts) |> elem(1)
end

This function prepare the query which will be passed to the execute function.

  def ensure_select(%{select: nil} = query, true) do
    %{query | select: %SelectExpr{expr: {:&, [], [0]}, line: __ENV__.line, file: __ENV__.file}}
end
  • attach_prefix(query, opts), The attach_prefix function is a helper defined in Ecto.Repo.Queryalbe as
 defp attach_prefix(query, opts) do
    case Keyword.fetch(opts, :prefix) do
      {:ok, prefix} -> %{query | prefix: prefix}
      :error -> query
    end
end

It adds the prefix defined in the options (options are keywords ie a list of tuple [{otp1: true}, {opt2: "test"}, ...] to the query.

  defp execute(operation, name, query, opts) when is_list(opts) do
    {adapter, %{cache: cache} = adapter_meta} = Ecto.Repo.Registry.lookup(name)
    {query_meta, prepared, params} = Planner.query(query, operation, cache, adapter, 0)

    case query_meta do
      %{select: nil} ->
        adapter.execute(adapter_meta, query_meta, prepared, params, opts)
      %{select: select, sources: sources, preloads: preloads} ->
        %{
          preprocess: preprocess,
          postprocess: postprocess,
          take: take,
          assocs: assocs,
          from: from
        } = select

        preprocessor = preprocessor(from, preprocess, adapter)
        {count, rows} = adapter.execute(adapter_meta, query_meta, prepared, params, opts)
        postprocessor = postprocessor(from, postprocess, take, adapter)

        {count,
          rows
          |> Ecto.Repo.Assoc.query(assocs, sources, preprocessor)
          |> Ecto.Repo.Preloader.query(name, preloads, take, postprocessor, opts)}
    end
end
  • Ecto.Repo.Registry.lookup(name) is checking that the repo exists:
  def lookup(repo) when is_atom(repo) do
    GenServer.whereis(repo) # return process id of the repo or return an error if not find (line below)
    |> Kernel.||(raise "could not lookup #{inspect repo} because it was not started or it does not exist")
    |> lookup()
end

  # use ets to retreive the adapter associated with the process id of the repo
  def lookup(pid) when is_pid(pid) do
    :ets.lookup_element(__MODULE__, pid, 3)
end
  def query(query, operation, cache, adapter, counter) do
    {query, params, key} = plan(query, operation, adapter, counter)
    query_with_cache(key, query, operation, cache, adapter, counter, params)
end

The Ecto.Query.Planner.query function is calling Ecto.Query.Planner.plan function https://github.com/elixir-ecto/ecto/blob/d2bca3d36476cc92d2e761ab2d99c130c9ad83d5/lib/ecto/query/planner.ex#L206

  @doc """
  Prepares the query for cache.
  This means all the parameters from query expressions are
  merged into a single value and their entries are pruned
  from the query.
  This function is called by the backend before invoking
  any cache mechanism.
  """
  def plan(query, operation, adapter, counter) do
    query
    |> plan_sources(adapter)
    |> plan_assocs
    |> plan_combinations(operation, adapter, counter)
    |> plan_cache(operation, adapter, counter)
  rescue
    e ->
      # Reraise errors so we ignore the planner inner stacktrace
      filter_and_reraise e, System.stacktrace
end

So I think the plan function is just preparing the query.

then query_with_cache(key, query, operation, cache, adapter, counter, params) is called:

  defp query_with_cache(key, query, operation, cache, adapter, counter, params) do
    case query_lookup(key, query, operation, cache, adapter, counter) do
      {_, select, prepared} ->
        {build_meta(query, select), {:nocache, prepared}, params}
      {_key, :cached, select, cached} ->
        update = &cache_update(cache, key, &1)
        reset = &cache_reset(cache, key, &1)
        {build_meta(query, select), {:cached, update, reset, cached}, params}
      {_key, :cache, select, prepared} ->
        update = &cache_update(cache, key, &1)
        {build_meta(query, select), {:cache, update, prepared}, params}
    end
end

which calls query_lookup(key, query, operation, cache, adapter, counter). This function looks if the query already exists in the ets cache otherwise it will prepare the query

  defp query_lookup(key, query, operation, cache, adapter, counter) do
    case :ets.lookup(cache, key) do
      [term] -> term
      [] -> query_prepare(query, operation, adapter, counter, cache, key)
    end
end

Then query_prepare(query, operation, adapter, counter, cache, key):

  defp query_prepare(query, operation, adapter, counter, cache, key) do
    case query_without_cache(query, operation, adapter, counter) do
      {:cache, select, prepared} ->
        cache_insert(cache, key, {key, :cache, select, prepared})
      {:nocache, _, _} = nocache ->
        nocache
    end
end

Which runs query_without_cache(query, operation, adapter, counter):


  defp query_without_cache(query, operation, adapter, counter) do
    {query, select} = normalize(query, operation, adapter, counter)
    {cache, prepared} = adapter.prepare(operation, query)
    {cache, select, prepared}
end

And we can see that after the query has been normalised the adapter.prepare(operation, query) function is called. This function is defined in Ecto.Adapters.SQL https://github.com/elixir-ecto/ecto_sql/blob/3a72eb111f8613c8eada296bcd3dd89883a52b5a/lib/ecto/adapters/sql.ex#L134

 def prepare(:all, query) do
        {:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.all(query))}}
end

we can see that @conn.all(query) is called and conn is defined as @conn __MODULE__.Connection which is our Connection adapter module!

I think I manage to see the lifecycle of the Repo.all function. There are other helpers functions that are called but by doing this trace I understood that the query is first "prepared" then checked if it already exists in ets and if not then the Adapter will be called and the correct function will be triggered. Then postgrex will have the responsability to send the query directly to Postgres and to retrieve the result.
Now that we have a better idea where the query is transformed we can tried to copy or adapt some helper functions above to add some sql clause for adding the alog logic (uuid unique, latest timestamp,..)

@SimonLab
Copy link
Member

SimonLab commented Feb 19, 2019

Trying to recreate a similar subquery to the one currently on alog:

    sub = from (m in subquery(a1)), distinct: m.last_name, order_by: m.first_name, select: m
    query = from a in subquery(sub), where: not a.deleted, select: a
    Ecto.Adapters.Postgres.Connection.all(query)

where a1 is the query the adapter all function is getting

image

The create_names function here https://github.com/elixir-ecto/ecto_sql/blob/3a72eb111f8613c8eada296bcd3dd89883a52b5a/lib/ecto/adapters/postgres/connection.ex#L654

    defp create_names(%{sources: sources}) do
      create_names(sources, 0, tuple_size(sources)) |> List.to_tuple()
end

The error is from tuple_size(sources) where it seems that sources is nil
I think that the subquery function returns a struct query which is different that the one after the initial steps of the adapter which makes them incompatible

It looks like the sources are added in the Ecto.Query.Planner.plan call, so we may be able to call this function ourselves to prepare the query in our all function

Looking if we can reuse this function

@SimonLab
Copy link
Member

Trying to apply plan or plan_sources from Ecto.Query.Planner returns other errors.
I will instead

@RobStallion
Copy link
Member

Looking at @SimonLab 's comment, the following line appears to be where the adapter is called...

{count, rows} = adapter.execute(adapter_meta, query_meta, prepared, params, opts)

This calls the following function in the adapter...

def execute(adapter_meta, query_meta, query, params, opts) do
  Ecto.Adapters.SQL.execute(adapter_meta, query_meta, query, params, opts)
end

This function is defined here.

@RobStallion
Copy link
Member

RobStallion commented Feb 26, 2019

The arguments passed to this function are...

[
  adapter_meta: %{
    cache: #Reference<0.3974704928.2992242689.138205>,
    opts: [timeout: 15000, pool_size: 10, pool: DBConnection.ConnectionPool],
    pid: #PID<0.2848.0>,
    sql: AlogAdapter.Connection,
    telemetry: {UsingAlogAdapter.Repo, :debug, [],
     [:using_alog_adapter, :repo, :query]}
  },
  opts: [],
  params: [],
  prepared: {:cache,
   #Function<29.104601620/1 in Ecto.Query.Planner.query_with_cache/7>,
   {134695,
    "SELECT c0.\"id\", c0.\"comment\", c0.\"comment_id_no\", c0.\"show\", c0.\"cid\", c0.\"entry_id\", c0.\"inserted_at\", c0.\"updated_at\" FROM \"comments\" AS c0"}},
  query_meta: %{
    preloads: [],
    select: %{
      assocs: [],
      from: {:any,
       {:source, {"comments", UsingAlogAdapter.Comments}, nil,
        [
          id: :id,
          comment: :string,
          comment_id_no: :string,
          show: :boolean,
          cid: :string,
          entry_id: :string,
          inserted_at: :naive_datetime,
          updated_at: :naive_datetime
        ]}},
      postprocess: {:source, :from},
      preprocess: [source: :from],
      take: []
    },
    sources: {{"comments", UsingAlogAdapter.Comments, nil}}
  }
]

To clarify, the original call was

Repo.all(Comments)

@RobStallion
Copy link
Member

Next step

I am going to pass in a subquery to Repo.all and compare the two sets or arguments that are passed to the adapters execute/5 function.

@RobStallion
Copy link
Member

[
  adapter_meta: %{
    cache: #Reference<0.872129942.2461401090.223629>,
    opts: [timeout: 15000, pool_size: 10, pool: DBConnection.ConnectionPool],
    pid: #PID<0.329.0>,
    sql: AlogAdapter.Connection,
    telemetry: {UsingAlogAdapter.Repo, :debug, [],
     [:using_alog_adapter, :repo, :query]}
  },
  opts: [],
  params: [],
  prepared: {:cache,
   #Function<29.104601620/1 in Ecto.Query.Planner.query_with_cache/7>,
   {103,
    "SELECT DISTINCT ON (c0.\"cid\") c0.\"id\", c0.\"comment\", c0.\"comment_id_no\", c0.\"show\", c0.\"cid\", c0.\"entry_id\", c0.\"inserted_at\", c0.\"updated_at\" FROM \"comments\" AS c0 WHERE (c0.\"comment_id_no\" = '1') ORDER BY c0.\"cid\", c0.\"inserted_at\" DESC"}},
  query_meta: %{
    preloads: [],
    select: %{
      assocs: [],
      from: {:any,
       {:source, {"comments", UsingAlogAdapter.Comments}, nil,
        [
          id: :id,
          comment: :string,
          comment_id_no: :string,
          show: :boolean,
          cid: :string,
          entry_id: :string,
          inserted_at: :naive_datetime,
          updated_at: :naive_datetime
        ]}},
      postprocess: {:source, :from},
      preprocess: [source: :from],
      take: []
    },
    sources: {{"comments", UsingAlogAdapter.Comments, nil}}
  }
]

These are the arguments passed to the adapters execute/5 function when all is called with the following subquery...

sub =
  from(c in Comments,
  distinct: c.cid,
  order_by: [desc: :inserted_at]
)

query = from(c in sub, where: c.comment_id_no == "1")
Repo.all(query)

@RobStallion
Copy link
Member

At first glance the arguments passed to our adapter look almost identical, with the OBVIOUS exception of the :prepared atom.

:prepared is the same as :query
I just logged the arguments from the function that
Ecto.Adapters.SQL.execute(adapter_meta, query_meta, query, params, opts)
calls.
So I logged def execute(adapter_meta, query_meta, prepared, params, opts) this line.
This is the only reason the function says :query but the log says :prepared.

@RobStallion
Copy link
Member

Hopefully this means that if we can somehow update the query that is passed to the adapter, to add subquery to it.

Will look into possible approaches for this.

@SimonLab
Copy link
Member

SimonLab commented Feb 28, 2019

My latest attempt was to try to reproduce the logic of the all function from the Postgres adapter:

    def all(query) do
      sources = create_names(query)
      {select_distinct, order_by_distinct} = distinct(query.distinct, sources, query)

      from = from(query, sources)
      select = select(query, select_distinct, sources)
      join = join(query, sources)
      where = where(query, sources)
      group_by = group_by(query, sources)
      having = having(query, sources)
      window = window(query, sources)
      combinations = combinations(query)
      order_by = order_by(query, order_by_distinct, sources)
      limit = limit(query, sources)
      offset = offset(query, sources)
      lock = lock(query.lock)

      [select, from, join, where, group_by, having, window, combinations, order_by, limit, offset | lock]
end

This function returns an improper list of improper list of string. I'm not certain why exactly improper lists are used here but I think this might be for optimisation and to make pattern matching easier.

see https://hexdocs.pm/elixir/master/List.html
"Some lists, called improper lists, do not have an empty list as the second element in the last cons cell"

Then I've looked at the private function from to understand a bit the structure of the returned value of all:

    defp from(%{from: %{source: source}} = query, sources) do
      {from, name} = get_source(query, sources, 0, source)
      [" FROM ", from, " AS " | name]
end

So it looks like it create an improper list which is similar to the from part of an sql query.
My idea is to create/transform one of the improper list to be able to add a subquery.
I haven't manage to get to this point yet. An idea would be to IO.inspect in the all function of the normal Postgres adapter a query containing a subquery to try to understand how it's formed. Then try to reproduce the logic with our specific alog subquery.

Then try to repeat this process for the distinct part of the alog query.

@RobStallion
Copy link
Member

The code that will enable access to the all query can be found here.

At the moment I am unsure if this function is only called when Repo.all is called or if it is called for other Repo functions as well.

@SimonLab
Copy link
Member

SimonLab commented Mar 6, 2019

Running the tests will run the migrations which will run Repo.all. At the moment we are adding distinct on on all the queries but I don't think it is needed for the migration:
image

To be able to get all the different parts of the query (ie table name, fields, table_as...) I was using String.split but the code become tedious and not really readable, I'm going to look at how to replace it with regex. This will allow me to get the table name and create a switch to know if Repo.all is run on a migration or on a user query

@SimonLab
Copy link
Member

SimonLab commented Mar 6, 2019

Given a query similar to:
we can retrieve the different part of the query with the following regex:

Regex.named_captures(~r/(\bSELECT\b)\s(?<fields>.*)\sFROM\s(?<table_name>.*)\sas\s(?<table_as>.*)(?<rest_query>.*)/i, query)
  • the ?<name> give a name to the element we try to match
  • the `\bmath\b is to add boundary to our match, for example we want to match "select" only and not "selected"
  • at the end of the regex the /i is for case insensitive, it looks like Ecto is using upper case to create the query but to be sure we can create the regex caseless.

@SimonLab
Copy link
Member

SimonLab commented Mar 6, 2019

I think the alog all query is now ready:
image

I've recreated the following structure for the query:

# SELECT
#   s0."id",
#   s0."name",
#   s0."entry_id",
#   s0."deleted",
#   s0."inserted_at",
#   s0."updated_at"
# FROM
#   (SELECT DISTINCT ON (d0."entry_id")
#       d0."id" AS "id"
#     , d0."name" AS "name"
#     , d0."entry_id" AS "entry_id"
#     , d0."deleted" AS "deleted"
#     , d0."inserted_at" AS "inserted_at"
#     , d0."updated_at" AS "updated_at"
#   FROM "drink_types" AS d0
#   ORDER BY d0."entry_id", d0."updated_at" DESC)
# AS s0 WHERE (NOT (s0."deleted"))

I will need to write more tests to see if any error occurs.

SimonLab added a commit that referenced this issue Mar 12, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request in-progress
Projects
None yet
Development

No branches or pull requests

3 participants