Skip to content

Commit

Permalink
Use INSERT ... ON DUPLICATE KEY UPDATE for upsert on mysql
Browse files Browse the repository at this point in the history
This can be used for all upsert expressions (where REPLACE INTO used
previously were only possible to use for subset of queries), and may
potentially help with deadlocks reported by mysql when we issues multiple
querier for same key in quick succession.
  • Loading branch information
prefiks committed Jun 10, 2022
1 parent a6101cc commit a89b1f3
Showing 1 changed file with 52 additions and 45 deletions.
97 changes: 52 additions & 45 deletions src/ejabberd_sql_pt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -570,43 +570,33 @@ parse_upsert_field1([C | S], Acc, ParamPos, Loc) ->

make_sql_upsert(Table, ParseRes, Pos) ->
check_upsert(ParseRes, Pos),
HasInsertOnlyFields = lists:any(
fun({_, {false}, _}) -> true;
(_) -> false
end, ParseRes),
MySqlReplace = case HasInsertOnlyFields of
false ->
[erl_syntax:clause(
[erl_syntax:atom(mysql), erl_syntax:underscore()],
[],
[make_sql_upsert_mysql(Table, ParseRes),
erl_syntax:atom(ok)])];
_ ->
[]
end,
erl_syntax:fun_expr(
[erl_syntax:clause(
[erl_syntax:atom(pgsql), erl_syntax:variable("__Version")],
[erl_syntax:infix_expr(
erl_syntax:variable("__Version"),
erl_syntax:operator('>='),
erl_syntax:integer(90500))],
[make_sql_upsert_pgsql905(Table, ParseRes),
erl_syntax:atom(ok)]),
erl_syntax:clause(
[erl_syntax:atom(pgsql), erl_syntax:variable("__Version")],
[erl_syntax:infix_expr(
erl_syntax:variable("__Version"),
erl_syntax:operator('>='),
erl_syntax:integer(90100))],
[make_sql_upsert_pgsql901(Table, ParseRes),
erl_syntax:atom(ok)])] ++
MySqlReplace ++
[erl_syntax:clause(
[erl_syntax:underscore(), erl_syntax:underscore()],
none,
[make_sql_upsert_generic(Table, ParseRes)])
]).
[erl_syntax:clause(
[erl_syntax:atom(pgsql), erl_syntax:variable("__Version")],
[erl_syntax:infix_expr(
erl_syntax:variable("__Version"),
erl_syntax:operator('>='),
erl_syntax:integer(90500))],
[make_sql_upsert_pgsql905(Table, ParseRes),
erl_syntax:atom(ok)]),
erl_syntax:clause(
[erl_syntax:atom(pgsql), erl_syntax:variable("__Version")],
[erl_syntax:infix_expr(
erl_syntax:variable("__Version"),
erl_syntax:operator('>='),
erl_syntax:integer(90100))],
[make_sql_upsert_pgsql901(Table, ParseRes),
erl_syntax:atom(ok)]),
erl_syntax:clause(
[erl_syntax:atom(mysql), erl_syntax:underscore()],
[],
[make_sql_upsert_mysql(Table, ParseRes),
erl_syntax:atom(ok)]),
erl_syntax:clause(
[erl_syntax:underscore(), erl_syntax:underscore()],
none,
[make_sql_upsert_generic(Table, ParseRes)])
]).

make_sql_upsert_generic(Table, ParseRes) ->
Update = make_sql_query(make_sql_upsert_update(Table, ParseRes)),
Expand Down Expand Up @@ -672,9 +662,6 @@ make_sql_upsert_update(Table, ParseRes) ->
State.

make_sql_upsert_insert(Table, ParseRes) ->
make_sql_upsert_insert_replace(Table, ParseRes, "INSERT").

make_sql_upsert_insert_replace(Table, ParseRes, Keyword) ->
Vals =
lists:map(
fun({_Field, _, ST}) ->
Expand All @@ -687,23 +674,43 @@ make_sql_upsert_insert_replace(Table, ParseRes, Keyword) ->
end, ParseRes),
State =
concat_states(
[#state{'query' = [{str, Keyword ++" INTO "}, {str, Table}, {str, "("}]},
[#state{'query' = [{str, "INSERT INTO "}, {str, Table}, {str, "("}]},
join_states(Fields, ", "),
#state{'query' = [{str, ") VALUES ("}]},
join_states(Vals, ", "),
#state{'query' = [{str, ");"}]}
]),
State.

make_sql_upsert_replace(Table, ParseRes) ->
make_sql_upsert_insert_replace(Table, ParseRes, "REPLACE").

make_sql_upsert_mysql(Table, ParseRes) ->
Replace = make_sql_query(make_sql_upsert_replace(Table, ParseRes)),
Vals =
lists:map(
fun({_Field, _, ST}) ->
ST
end, ParseRes),
{Fields, Set} =
lists:foldr(
fun({Field, key, _ST}, {F, S}) ->
{[#state{'query' = [{str, Field}]} | F], S};
({Field, {false}, _ST}, {F, S}) ->
{[#state{'query' = [{str, Field}]} | F], S};
({Field, {true}, _ST}, {F, S}) ->
{[#state{'query' = [{str, Field}]} | F],
[#state{'query' = [{str, Field}, {str, "=VALUES("}, {str, Field}, {str, ")"}]} | S]}
end, {[], []}, ParseRes),
Insert =
concat_states(
[#state{'query' = [{str, "INSERT INTO "}, {str, Table}, {str, "("}]},
join_states(Fields, ", "),
#state{'query' = [{str, ") VALUES ("}]},
join_states(Vals, ", "),
#state{'query' = [{str, ") ON DUPLICATE KEY UPDATE "}]},
join_states(Set, ", ")
]),
erl_syntax:application(
erl_syntax:atom(ejabberd_sql),
erl_syntax:atom(sql_query_t),
[Replace]).
[make_sql_query(Insert)]).

make_sql_upsert_pgsql901(Table, ParseRes0) ->
ParseRes = lists:map(
Expand Down

0 comments on commit a89b1f3

Please sign in to comment.