Permalink
Browse files

Fixed synchronization problems

  • Loading branch information...
1 parent b9a134e commit faf7989a419da16f841bd919783367a064fcbea0 @ian-plosker committed Feb 14, 2012
Showing with 98 additions and 29 deletions.
  1. +29 −17 c_src/stm_erl.c
  2. +69 −12 src/stm.erl
View
@@ -1,4 +1,5 @@
#include <stdio.h>
+#include <string.h>
#include "erl_nif.h"
@@ -19,6 +20,10 @@ typedef struct {
unsigned int size;
} stm_erl_var;
+typedef struct {
+ stm_erl_var *var;
+} stm_erl_handle;
+
ERL_NIF_TERM stm_erl_init(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
stm_init();
return enif_make_atom(env, "ok");
@@ -31,53 +36,60 @@ ERL_NIF_TERM stm_erl_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
ERL_NIF_TERM stm_erl_trans_start(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
stm_init_thread();
- stm_start(NULL);
+ stm_tx_attr_t *_a = enif_alloc(sizeof(*_a));
+ (*_a).id = rand();
+ (*_a).no_retry = 1;
+ stm_start(_a);
return enif_make_atom(env, "ok");
}
ERL_NIF_TERM stm_erl_commit(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
- stm_commit(NULL);
+ int aborted = stm_aborted();
+ aborted ? stm_abort(STM_ABORT_EXPLICIT) : stm_commit();
stm_exit_thread();
- return enif_make_atom(env, "ok");
+ return enif_make_atom(env, aborted == 0 ? "ok" : "error");
}
ERL_NIF_TERM stm_erl_new_var(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
- stm_erl_var* var = (stm_erl_var*)enif_alloc_resource(stm_erl_RESOURCE,
- sizeof(stm_erl_var));
+ stm_erl_handle* handle = (stm_erl_handle*)enif_alloc_resource(stm_erl_RESOURCE,
+ sizeof(stm_erl_handle));
+
+ handle->var = enif_alloc(sizeof(stm_erl_var));
+
long value;
if (enif_get_int64(env, argv[0], &value)) {
- var->field = enif_alloc(sizeof(int64_t));
- var->type = INT;
- var->size = sizeof(int);
+ handle->var->field = enif_alloc(sizeof(int64_t));
+ handle->var->type = INT;
+ handle->var->size = sizeof(int);
- stm_store_long(var->field, value);
+ stm_store_long(handle->var->field, value);
}
else
return enif_make_badarg(env);
- ERL_NIF_TERM result = enif_make_resource(env, var);
- enif_release_resource(var);
+ ERL_NIF_TERM result = enif_make_resource(env, handle);
+ enif_release_resource(handle);
return enif_make_tuple2(env, enif_make_atom(env, "ok"), result);
}
ERL_NIF_TERM stm_erl_load_var(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
- stm_erl_var* var;
- if (!enif_get_resource(env, argv[0], stm_erl_RESOURCE, (void**)&var))
+ stm_erl_handle* handle;
+ if (!enif_get_resource(env, argv[0], stm_erl_RESOURCE, (void**)&handle))
return enif_make_badarg(env);
- long value = stm_load_long(var->field);
+ long value = stm_load_long(handle->var->field);
return enif_make_int64(env, value);
}
ERL_NIF_TERM stm_erl_store_var(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
- stm_erl_var* var;
+ stm_erl_handle* handle;
long value;
- if (!enif_get_resource(env, argv[1], stm_erl_RESOURCE, (void**)&var) ||
+ if (!enif_get_resource(env, argv[1], stm_erl_RESOURCE, (void**)&handle) ||
!enif_get_int64(env, argv[0], &value))
return enif_make_badarg(env);
- stm_store_long(var->field, value);
+ stm_store_long(handle->var->field, value);
return enif_make_atom(env, "ok");
}
View
@@ -3,13 +3,8 @@
-compile(export_all).
--define(atomic(X), (fun() ->
- trans_start(),
- Result = X,
- commit(),
- io:format("A: ~p~n", [ [?LINE] ]),
- Result
- end)()).
+-define(atomic(X), atomic(fun() -> X end)).
+-define(DEFAULT_TRANS_RETRIES, 5).
-on_load(init/0).
@@ -18,9 +13,22 @@
-include_lib("eunit/include/eunit.hrl").
-endif.
+atomic(Fun) -> atomic(Fun, ?DEFAULT_TRANS_RETRIES).
+
+atomic(_Fun, 0) -> {error, transaction_failed};
+atomic(Fun, N) ->
+ trans_start(),
+ Result = Fun(),
+ case commit() of
+ ok ->
+ Result;
+ error ->
+ atomic(Fun, N - 1)
+ end.
+
-spec init() -> ok | {error, any()}.
init() ->
- case code:priv_dir(dlht) of
+ case code:priv_dir(?MODULE) of
{error, bad_name} ->
case code:which(?MODULE) of
Filename when is_list(Filename) ->
@@ -31,7 +39,7 @@ init() ->
SoName = filename:join("../priv", "stm_erl")
end;
Dir ->
- SoName = filename:join(Dir, "dlcbf")
+ SoName = filename:join(Dir, "stm_erl")
end,
erlang:load_nif(SoName, 0).
@@ -88,14 +96,17 @@ store_var(_Var, _Val) ->
basic_test() ->
initialize(),
- ?assert(true),
+
{ok, Var} = ?atomic(new_var(1)),
+
Val1 = ?atomic(begin
load_var(Var)
end),
?assert(Val1 == 1),
- ok = ?atomic(store_var(2, Var)),
+
+
+ ?atomic(store_var(2, Var)),
Val2 = ?atomic(load_var(Var)),
?assert(Val2 == 2),
@@ -106,5 +117,51 @@ basic_test() ->
end),
?assert(Val3 == 3).
--endif.
+spawn_fun_n(_Fun, 0) -> ok;
+spawn_fun_n(Fun, N) ->
+ %io:format("yo~n"),
+ spawn(Fun),
+ spawn_fun_n(Fun, N - 1).
+sync_test() ->
+ initialize(),
+
+ {ok, Var} = ?atomic(new_var(0)),
+
+ Self = self(),
+ Fun = fun() ->
+ timer:sleep(random:uniform(10)),
+ case ?atomic(
+ begin
+ Val0 = load_var(Var),
+ ok = store_var(Val0 + 1, Var)
+ end
+ ) of
+ {error, _} ->
+ Self ! failed;
+ _ ->
+ Self ! successful
+ end
+ end,
+
+ spawn_fun_n(Fun, 10000),
+
+ timer:sleep(50),
+
+ Val = ?atomic(load_var(Var)),
+ ?assertEqual(gather_successful_trans_count(10000), Val).
+
+gather_successful_trans_count(N) ->
+ gather_successful_trans_count(N, 0).
+
+gather_successful_trans_count(0, Count) ->
+ Count;
+gather_successful_trans_count(N, Count) ->
+ receive
+ failed ->
+ gather_successful_trans_count(N - 1, Count);
+ successful ->
+ gather_successful_trans_count(N - 1, Count + 1)
+ end.
+
+-endif.

0 comments on commit faf7989

Please sign in to comment.