Skip to content
This repository has been archived by the owner on May 2, 2023. It is now read-only.

Commit

Permalink
stop script on first unsuccessful op (after several retries), add sev…
Browse files Browse the repository at this point in the history
…eral error messages (#186)
  • Loading branch information
goncalotomas committed Feb 12, 2018
1 parent 45de3c4 commit e1e6331
Showing 1 changed file with 50 additions and 37 deletions.
87 changes: 50 additions & 37 deletions scripts/populate_fmke.escript
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,11 @@ populate(Database, ConfigFile, FMKeNodes) ->
pong ->
io:format("Populating ~p...\n", [Database]),
case populate_db(Nodes, Config) of
{ok, 0, _} ->
{ok, 0} ->
io:format("Population unsuccessful, please check if the database already contains records from previous benchmarks.~n"),
halt(1);
{ok, NumOkOps, NumUnsuccessfulOps} ->
io:format("Successfully populated ~p (~p insertions out of ~p).~n",
[Database, NumOkOps, NumOkOps+NumUnsuccessfulOps])
{ok, Ops} ->
io:format("Successfully populated ~p (~p insertions).~n", [Database, Ops])
end
end.

Expand All @@ -78,12 +77,12 @@ multi_ping([H|T]) ->
end.

populate_db(Nodes, Config) ->
{ok, S1, E1} = add_patients(Nodes, Config#fmkeconfig.numpatients),
{ok, S2, E2} = add_pharmacies(Nodes, Config#fmkeconfig.numpharmacies),
{ok, S3, E3} = add_facilities(Nodes, Config#fmkeconfig.numfacilities),
{ok, S4, E4} = add_staff(Nodes, Config#fmkeconfig.numstaff),
{ok, S5, E5} = add_prescriptions(Nodes, Config#fmkeconfig.numprescriptions, Config),
{ok, S1 + S2 + S3 + S4 + S5, E1 + E2 + E3 + E4 + E5}.
{ok, S1} = add_patients(Nodes, Config#fmkeconfig.numpatients),
{ok, S2} = add_pharmacies(Nodes, Config#fmkeconfig.numpharmacies),
{ok, S3} = add_facilities(Nodes, Config#fmkeconfig.numfacilities),
{ok, S4} = add_staff(Nodes, Config#fmkeconfig.numstaff),
{ok, S5} = add_prescriptions(Nodes, Config#fmkeconfig.numprescriptions, Config),
{ok, S1 + S2 + S3 + S4 + S5}.

parallel_create(Name, Amount, Fun) ->
NumProcs = ?NUMTHREADS,
Expand All @@ -97,10 +96,10 @@ spawn_workers(Pid, ProcsLeft, [H|T], Fun) ->
spawn_workers(Pid, ProcsLeft - 1, T, Fun).

supervisor_loop(Name, NumOps, Total) ->
supervisor_loop(Name, NumOps, Total, {0, 0}).
supervisor_loop(Name, NumOps, Total, 0).

supervisor_loop(_Name, Total, Total, {Suc, Err}) -> {ok, Suc, Err};
supervisor_loop(Name, NumOps, Total, {Suc, Err}) ->
supervisor_loop(_Name, Total, Total, Ops) -> {ok, Ops};
supervisor_loop(Name, NumOps, Total, Ops) ->
receive
{done, ok, _SeqNumber} ->
CurrentProgress = 100 * (NumOps + 1) / Total,
Expand All @@ -112,10 +111,7 @@ supervisor_loop(Name, NumOps, Total, {Suc, Err}) ->
false ->
ok
end,
supervisor_loop(Name, NumOps + 1, Total, {Suc + 1, Err});
{done, {error, _Reason}, _SeqNumber} ->
% io:format("Error creating ~p #~p...~n~p~n", [Name, SeqNumber, Reason]),
supervisor_loop(Name, NumOps + 1, Total, {Suc, Err + 1})
supervisor_loop(Name, NumOps + 1, Total, Ops + 1)
end.

create(Pid, Id, Fun) ->
Expand Down Expand Up @@ -167,20 +163,16 @@ add_prescriptions(_Nodes, 0, _Config) -> ok;
add_prescriptions(Nodes, Amount, Config) when Amount > 0 ->
io:format("Creating prescriptions...~n"),
ListPatientIds = gen_sequence(Config#fmkeconfig.numpatients, ?ZIPF_SKEW, Config#fmkeconfig.numprescriptions),
add_prescription_rec(Nodes, Amount, ListPatientIds, Config, {0, 0}).
add_prescription_rec(Nodes, Amount, ListPatientIds, Config, 0).

add_prescription_rec(_Nodes, 0, _PatientIds, _Config, {Suc, Err}) -> {ok, Suc, Err};
add_prescription_rec(Nodes, PrescriptionId, ListPatientIds, FmkConfig, {Suc, Err}) ->
add_prescription_rec(_Nodes, 0, _PatientIds, _Config, Ops) -> {ok, Ops};
add_prescription_rec(Nodes, PrescriptionId, ListPatientIds, FmkConfig, Ops) ->
[CurrentId | Tail] = ListPatientIds,
PharmacyId = rand:uniform(FmkConfig#fmkeconfig.numpharmacies),
PrescriberId = rand:uniform(FmkConfig#fmkeconfig.numstaff),
Node = lists:nth(PrescriptionId rem length(Nodes) + 1, Nodes),
Result = run_op(Node, create_prescription, [PrescriptionId, CurrentId, PrescriberId, PharmacyId, gen_random_date(), gen_random_drugs()]),
{Suc2, Err2} = case Result of
ok -> {Suc + 1, Err};
{error, _Reason} -> {Suc, Err + 1}
end,
add_prescription_rec(Nodes, PrescriptionId - 1, Tail, FmkConfig, {Suc2, Err2}).
ok = run_op(Node, create_prescription, [PrescriptionId, CurrentId, PrescriberId, PharmacyId, gen_random_date(), gen_random_drugs()]),
add_prescription_rec(Nodes, PrescriptionId - 1, Tail, FmkConfig, Ops + 1).

run_op(FmkNode, create_pharmacy, Params) ->
[_Id, _Name, _Address] = Params,
Expand All @@ -199,20 +191,41 @@ run_op(FmkNode, create_prescription, Params) ->
run_rpc_op(FmkNode, create_prescription, Params).

run_rpc_op(FmkNode, Op, Params) ->
run_rpc_op(FmkNode, Op, Params, 0, ?MAX_RETRIES).
run_rpc_op(FmkNode, Op, Params, 0, ?MAX_RETRIES, none).

run_rpc_op(_FmkNode, Op, Params, MaxTries, MaxTries) ->
io:format("Error calling ~p(~p), tried ~p times\n", [Op, Params, MaxTries]),
run_rpc_op(_FmkNode, Op, Params, MaxTries, MaxTries, none) ->
io:format("FMKe node is reachable but operation timed out. Check status of FMKe node~n", []),
{error, exceeded_num_retries};
run_rpc_op(FmkNode, Op, Params, CurrentTry, MaxTries) ->
run_rpc_op(_FmkNode, Op, Params, MaxTries, MaxTries, Error) ->
case Error of
nodedown ->
io:format("FMKe node is down or unreachable. Check link between populator and FMKe~n", []);
timeout ->
io:format("FMKe node is reachable but operation timed out. Check status of FMKe node~n", []);
no_connection ->
io:format("FMKe is not connected to the database. Please check the link between FMKe and the database.~n", []);
broken_link ->
io:format("Connection between FMKe and DB is unstable/broken. Please check the link between FMKe and the database.~n", [])
end,
halt(2);
run_rpc_op(FmkNode, Op, Params, CurrentTry, MaxTries, Error) ->
case rpc:call(FmkNode, fmke, Op, Params) of
{badrpc,timeout} ->
run_rpc_op(FmkNode, Op, Params, CurrentTry + 1, MaxTries);
{error, Reason} ->
% io:format("Error ~p in ~p with params ~p\n", [Reason, Op, Params]),
{error, Reason};
ok -> ok
end.
{badrpc, nodedown} ->
run_rpc_op(FmkNode, Op, Params, CurrentTry + 1, MaxTries, nodedown);
{badrpc, timeout} ->
run_rpc_op(FmkNode, Op, Params, CurrentTry + 1, MaxTries, none);
{badrpc, {'EXIT', {{{badmatch, {error, no_connection}}, _DriverStackTrace}, _AppStackTrace}}} ->
run_rpc_op(FmkNode, Op, Params, CurrentTry + 1, MaxTries, no_connection);
{badrpc, {'EXIT', {noproc, {gen_server, call, _AppStackTrace}}}} ->
run_rpc_op(FmkNode, Op, Params, CurrentTry + 1, MaxTries, broken_link);
{badrpc, {'EXIT', {timeout, {gen_server, call, _AppStackTrace}}}} ->
run_rpc_op(FmkNode, Op, Params, CurrentTry + 1, MaxTries, broken_link);
{badrpc, {'EXIT', {{{badmatch, {error, tcp_closed, _Pid}}, _DriverStackTrace}, {gen_server, call, _AppStackTrace}}}} ->
run_rpc_op(FmkNode, Op, Params, CurrentTry + 1, MaxTries, broken_link);
{badrpc, {'EXIT', {{{badmatch,{error,tcp_closed}}, _DriverStackTrace}, {gen_server,call, _AppStackTrace}}}} ->
run_rpc_op(FmkNode, Op, Params, CurrentTry + 1, MaxTries, broken_link);
ok -> ok
end.

gen_sequence(Size, Skew, SequenceSize) ->
Bottom = 1 / (lists:foldl(fun(X, Sum) -> Sum + (1 / math:pow(X, Skew)) end, 0, lists:seq(1, Size))),
Expand Down

0 comments on commit e1e6331

Please sign in to comment.