Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

encapsulate usage of ets tables #413

Merged
merged 3 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions include/antidote.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@
%% This is the time that nodes will sleep inbetween sending meta-data
%% to other physical nodes within the DC
-define(META_DATA_SLEEP, 1000).
-define(META_TABLE_NAME, a_meta_data_table).
-define(REMOTE_META_TABLE_NAME, a_remote_meta_data_table).
-define(META_TABLE_STABLE_NAME, a_meta_data_table_stable).
%% Uncomment the following line to use erlang:now()
%% Otherwise os:timestamp() is used which can go backwards
%% which is unsafe for clock-si
Expand Down
143 changes: 143 additions & 0 deletions src/antidote_ets_meta_data.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
%% -------------------------------------------------------------------
%%
%% Copyright <2013-2020> <
%% Technische Universität Kaiserslautern, Germany
%% Université Pierre et Marie Curie / Sorbonne-Université, France
%% Universidade NOVA de Lisboa, Portugal
%% Université catholique de Louvain (UCL), Belgique
%% INESC TEC, Portugal
%% >
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either expressed or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% List of the contributors to the development of Antidote: see AUTHORS file.
%% Description and complete License: see LICENSE file.
%% -------------------------------------------------------------------
-module(antidote_ets_meta_data).

-include("antidote.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("kernel/include/logger.hrl").

-define(META_TABLE_NAME, a_meta_data_table).
-define(REMOTE_META_TABLE_NAME, a_remote_meta_data_table).
-define(META_DATA_SENDER_TABLE_NAME, a_meta_data_sender_table).

-export([create_meta_data_table/1,
create_meta_data_sender_table/1,
create_remote_meta_data_table/1,
insert_meta_data_sender_merged_data/2,
insert_meta_data/3,
delete_meta_data_partition/2,
get_meta_data_sender_merged_data/2,
remote_table_ready/1,
get_meta_data_as_map/1,
get_remote_meta_data_as_map/1,
delete_meta_data_table/1,
delete_meta_data_sender_table/1,
delete_remote_meta_data_table/1,
insert_remote_meta_data/3,
insert_remote_meta_data_new/3,
delete_remote_meta_data_node/2]).

%%%===================================================================
%%% API
%%%===================================================================

- spec create_meta_data_table(atom()) -> ets:tab().
create_meta_data_table(Name) ->
ets:new(get_table_name(Name, ?META_TABLE_NAME), [set, named_table, public, ?META_TABLE_CONCURRENCY]).

- spec create_meta_data_sender_table(atom()) -> ets:tab().
create_meta_data_sender_table(Name) ->
ets:new(get_table_name(Name, ?META_DATA_SENDER_TABLE_NAME), [set, named_table, ?META_TABLE_STABLE_CONCURRENCY]).

- spec create_remote_meta_data_table(atom()) -> ets:tab().
create_remote_meta_data_table(Name) ->
ets:new(get_table_name(Name, ?REMOTE_META_TABLE_NAME), [set, named_table, protected, ?META_TABLE_CONCURRENCY]).

- spec insert_meta_data_sender_merged_data(atom(), term()) -> true.
insert_meta_data_sender_merged_data(Name, Data) ->
ets:insert(get_table_name(Name, ?META_DATA_SENDER_TABLE_NAME), {merged_data, Data}).

-spec insert_meta_data(atom(), partition_id(), term()) -> true.
insert_meta_data(Name, Partition, Data) ->
ets:insert(get_table_name(Name, ?META_TABLE_NAME), {Partition, Data}).


%% Remove meta data for partition
-spec delete_meta_data_partition(atom(), partition_id()) -> true.
delete_meta_data_partition(Name, Partition) ->
ets:delete(get_table_name(Name, ?META_TABLE_NAME), Partition).

-spec get_meta_data_sender_merged_data(atom(), X) -> X.
get_meta_data_sender_merged_data(Name, Default) ->
case ets:lookup(get_table_name(Name, ?META_DATA_SENDER_TABLE_NAME), merged_data) of
[] ->
Default;
[{merged_data, Other}] ->
Other
end.

-spec remote_table_ready(atom()) -> boolean().
remote_table_ready(Name) ->
case ets:info(get_table_name(Name, ?REMOTE_META_TABLE_NAME)) of
undefined ->
false;
_ ->
true
end.

-spec get_meta_data_as_map(atom()) -> map().
get_meta_data_as_map(Name) ->
maps:from_list(ets:tab2list(get_table_name(Name, ?META_TABLE_NAME))).

-spec get_remote_meta_data_as_map(atom()) ->map().
get_remote_meta_data_as_map(Name) ->
maps:from_list(ets:tab2list(get_table_name(Name, ?REMOTE_META_TABLE_NAME))).

-spec delete_meta_data_table(atom()) -> true.
delete_meta_data_table(Name) ->
ets:delete(get_table_name(Name, ?META_TABLE_NAME)).

-spec delete_meta_data_sender_table(atom()) -> true.
delete_meta_data_sender_table(Name) ->
ets:delete(get_table_name(Name, ?META_DATA_SENDER_TABLE_NAME)).

-spec delete_remote_meta_data_table(atom()) -> true.
delete_remote_meta_data_table(Name) ->
ets:delete(get_table_name(Name, ?REMOTE_META_TABLE_NAME)).

-spec insert_remote_meta_data(ets:tab(), atom(), any()) -> true.
insert_remote_meta_data(Table, NodeId, Data) ->
ets:insert(Table, {NodeId, Data}).

-spec insert_remote_meta_data_new(ets:tab(), atom(), any()) -> true.
insert_remote_meta_data_new(Table, NodeId, Data) ->
ets:insert_new(Table, {NodeId, Data}).

-spec delete_remote_meta_data_node(ets:tab(), atom()) -> true.
delete_remote_meta_data_node(Table, NodeId) ->
ets:delete(Table, NodeId).


%%%===================================================================
%%% Internal Functions
%%%===================================================================

%% @private
-spec get_table_name(atom(), atom()) -> atom().
get_table_name(Name, TableName) ->
list_to_atom(atom_to_list(Name) ++ atom_to_list(TableName) ++ atom_to_list(node())).
125 changes: 125 additions & 0 deletions src/antidote_ets_txn_caches.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
%% -------------------------------------------------------------------
%%
%% Copyright <2013-2020> <
%% Technische Universität Kaiserslautern, Germany
%% Université Pierre et Marie Curie / Sorbonne-Université, France
%% Universidade NOVA de Lisboa, Portugal
%% Université catholique de Louvain (UCL), Belgique
%% INESC TEC, Portugal
%% >
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either expressed or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% List of the contributors to the development of Antidote: see AUTHORS file.
%% Description and complete License: see LICENSE file.
%% -------------------------------------------------------------------
%%
%% prepared_tx table: the prepared txn for each key. Note that for
%% each key, there can be at most one prepared txn in any
%% time.

-module(antidote_ets_txn_caches).

-include("antidote.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("kernel/include/logger.hrl").


-export([has_prepared_txns_cache/1,
get_prepared_txns_by_key/2,
get_prepared_txn_by_key_and_table/2,
create_prepared_txns_cache/1,
delete_prepared_txns_cache/1,
is_prepared_txn_by_table/2,
delete_prepared_txn_by_table/2,
insert_prepared_txn_by_table/3,
get_prepared_cache_name/1]).

%%%===================================================================
%%% API
%%%===================================================================

-spec has_prepared_txns_cache(partition_id()) -> boolean().
has_prepared_txns_cache(Partition) ->
case ets:info(get_prepared_cache_name(Partition)) of
undefined -> false;
_ -> true
end.

-spec get_prepared_txns_by_key(partition_id(), key()) -> list().
get_prepared_txns_by_key(Partition, Key) ->
get_prepared_txn_by_key_and_table(get_prepared_cache_name(Partition), Key).

-spec get_prepared_txn_by_key_and_table(cache_id(), key()) -> list().
get_prepared_txn_by_key_and_table(Table, Key) ->
case ets:lookup(Table, Key) of
[] ->
[];
[{Key, List}] ->
List
end.

-spec is_prepared_txn_by_table(cache_id(), key()) -> boolean().
is_prepared_txn_by_table(Table, Key) ->
case ets:lookup(Table, Key) of
[] ->
true;
_ ->
false
end.

-spec delete_prepared_txn_by_table(cache_id(), key()) -> true.
delete_prepared_txn_by_table(Table, Key) ->
ets:delete(Table, Key).

-spec insert_prepared_txn_by_table(cache_id(), key(), list()) -> true.
insert_prepared_txn_by_table(Table, Key, List) ->
ets:insert(Table, {Key, List}).

-spec create_prepared_txns_cache(partition_id()) -> cache_id().
create_prepared_txns_cache(Partition) ->
case has_prepared_txns_cache(Partition) of
false ->
ets:new(get_prepared_cache_name(Partition),
[set, protected, named_table, ?TABLE_CONCURRENCY]);
true ->
%% Other vnode hasn't finished closing tables
?LOG_DEBUG("Unable to open ets table in clocksi vnode, retrying"),
timer:sleep(100),
delete_prepared_txns_cache(Partition),
create_prepared_txns_cache(Partition)
end.

-spec delete_prepared_txns_cache(partition_id()) -> true.
delete_prepared_txns_cache(Partition) ->
try
ets:delete(get_prepared_cache_name(Partition))
catch
_:Reason ->
?LOG_ERROR("Error closing table ~p", [Reason]),
true
end.

%%%===================================================================
%%% Internal Functions
%%%===================================================================

-spec get_prepared_cache_name(partition_id()) -> cache_id().
get_prepared_cache_name(Partition) ->
get_cache_name(Partition, prepared).

-spec get_cache_name(partition_id(), atom()) -> cache_id().
get_cache_name(Partition, Base) ->
list_to_atom(atom_to_list(node()) ++ atom_to_list(Base) ++ "-" ++ integer_to_list(Partition)).
17 changes: 8 additions & 9 deletions src/clocksi_readitem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ async_read_data_item({Partition, Node}, Key, Type, Transaction, PropertyList, {f
perform_read_internal(Key, Type, Transaction, PropertyList, Partition) ->
TxId = Transaction#transaction.txn_id,
TxLocalStartTime = TxId#tx_id.local_start_time,
PreparedCache = clocksi_vnode:get_cache_name(Partition, prepared),
case check_clock(Key, TxLocalStartTime, PreparedCache, Partition) of
case check_clock(Key, TxLocalStartTime, Partition) of
{not_ready, Time} ->
timer:sleep(Time),
perform_read_internal(Key, Type, Transaction, PropertyList, Partition);
Expand All @@ -89,24 +88,24 @@ perform_read_internal(Key, Type, Transaction, PropertyList, Partition) ->
%% if local clock is behind, it sleeps the fms until the clock
%% catches up. CLOCK-SI: clock skew.
%%
-spec check_clock(key(), clock_time(), atom(), partition_id()) ->
-spec check_clock(key(), clock_time(), partition_id()) ->
{not_ready, clock_time()} | ready.
check_clock(Key, TxLocalStartTime, PreparedCache, Partition) ->
check_clock(Key, TxLocalStartTime, Partition) ->
Time = dc_utilities:now_microsec(),
case TxLocalStartTime > Time of
true ->
{not_ready, (TxLocalStartTime - Time) div 1000 +1};
false ->
check_prepared(Key, TxLocalStartTime, PreparedCache, Partition)
check_prepared(Key, TxLocalStartTime, Partition)
end.

%% @doc check_prepared: Check if there are any transactions
%% being prepared on the tranaction being read, and
%% being prepared on the transaction being read, and
%% if they could violate the correctness of the read
-spec check_prepared(key(), clock_time(), atom(), partition_id()) ->
-spec check_prepared(key(), clock_time(), partition_id()) ->
ready | {not_ready, ?SPIN_WAIT}.
check_prepared(Key, TxLocalStartTime, PreparedCache, Partition) ->
{ok, ActiveTxs} = clocksi_vnode:get_active_txns_key(Key, Partition, PreparedCache),
check_prepared(Key, TxLocalStartTime, Partition) ->
{ok, ActiveTxs} = clocksi_vnode:get_active_txns_key(Key, Partition),
check_prepared_list(Key, TxLocalStartTime, ActiveTxs).

-spec check_prepared_list(key(), clock_time(), [{txid(), clock_time()}]) ->
Expand Down
Loading