Skip to content

Commit

Permalink
Implemented failed read/write replies to users for writes/reads which…
Browse files Browse the repository at this point in the history
… don't succeed.
  • Loading branch information
lethain committed Dec 2, 2009
1 parent 53b0704 commit 7a5dc77
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
37 changes: 33 additions & 4 deletions kvs.erl
Expand Up @@ -39,7 +39,9 @@ get(Key, Timeout) ->
Pid ! {self(), get, Key},
receive
{Pid, got, Value} ->
{ok, Value}
{ok, Value};
{error, Error} ->
{error, Error}
after
Timeout ->
{error, timeout}
Expand All @@ -56,7 +58,9 @@ set(Key, Val, Timeout) ->
Pid ! {self(), set, Key, Val},
receive
{Pid, received, {set, Key, Val}} ->
{ok, updated}
{ok, updated};
{error, Error} ->
{error, Error}
after
Timeout ->
{error, timeout}
Expand Down Expand Up @@ -94,7 +98,7 @@ store(Store = #kvs_store{data=Data, pending_reads=Reads, pending_writes=Writes})
pending_reads=proplists:delete({Key, Value}, Reads)});
{Count, Values, Timestamp} ->
store(Store#kvs_store{
pending_reads=[{{Client, Key}, {Count-1, [Value | Values], Timestamp}},
pending_reads=[{{Client, Key}, {Count-1, [Value | Values], Timestamp}} |
proplists:delete({Client, Key}, Reads)]})
end;
{Sender, set, Key, Value} ->
Expand All @@ -120,13 +124,38 @@ store(Store = #kvs_store{data=Data, pending_reads=Reads, pending_writes=Writes})
pending_writes=proplists:delete({Key, Value}, Writes)});
_ ->
store(Store#kvs_store{
pending_writes=[{{Client, Key}, {Count-1, Timestamp}},
pending_writes=[{{Client, Key}, {Count-1, Timestamp}} |
proplists:delete({Client, Key}, Writes)]})
end;
stop ->
ok
after
?KVS_POLL_PENDING ->
Writes2 = lists:filter(fun filter_writes/1, Writes),
Reads2 = lists:filter(fun filter_reads/1, Reads),
store(Store#kvs_store{pending_writes=Writes2, pending_reads=Reads2})
end.

filter_writes({{Client, _Key}, {_Count, Ts}}) ->
Now = ts(),
if Now > Ts + ?KVS_WRITE_TIMEOUT ->
Client ! {error, write_failed},
false;
true ->
true
end.

filter_reads({{Client, _Key}, {_Count, _Values, Ts}}) ->
Now = ts(),
if Now > Ts + ?KVS_READ_TIMEOUT ->
Client ! {error, read_failed},
false;
true ->
true
end.



ts() ->
{Mega, Sec, _} = erlang:now(),
(Mega * 1000000) + Sec.
5 changes: 3 additions & 2 deletions kvs.hrl
@@ -1,6 +1,7 @@
-define(TIMEOUT, infinity).
-define(KVS_WRITE_TIMEOUT, 1000).
-define(KVS_READ_TIMEOUT, 1000).
-define(KVS_WRITE_TIMEOUT, 2).
-define(KVS_READ_TIMEOUT, 1).
-define(KVS_POLL_PENDING, 500).
-define(KVS_WRITES, 3).
-define(KVS_READS, 3).
-record(kvs_store, {data, pending_reads, pending_writes}).

0 comments on commit 7a5dc77

Please sign in to comment.