Skip to content

Commit

Permalink
merging upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
boorad committed May 2, 2009
2 parents 0549aea + 4ad10b7 commit 9601f4a
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 51 deletions.
7 changes: 6 additions & 1 deletion Rakefile
Expand Up @@ -213,6 +213,7 @@ end
DRIVERS = FileList['c/*_drv.c'].pathmap("%{c,priv}X.so")
BEAMS = FileList['elibs/*.erl'].pathmap("%{elibs,ebin}X.beam")
TEST_BEAMS = FileList['etest/*.erl'].select {|d| d !~ /^.*_test.erl$/}.pathmap("%{etest,ebin}X.beam")
THRIFT_BEAMS = FileList['gen-erl/*.erl'].pathmap("%{gen-erl,ebin}X.beam")

directory "build"
directory "priv"
Expand Down Expand Up @@ -249,5 +250,9 @@ rule ".beam" => "%{ebin,etest}X.erl" do |t|
compile(t)
end

rule ".beam" => "%{ebin,gen-erl}X.erl" do |t|
compile(t)
end

task :build_c_drivers => [:c_env, "priv"] + DRIVERS
task :build_erl => BEAMS + TEST_BEAMS
task :build_erl => BEAMS + TEST_BEAMS + THRIFT_BEAMS
29 changes: 25 additions & 4 deletions c/bloom.c
Expand Up @@ -29,8 +29,11 @@ bloom_t *bloom_open(char* filename, long n, double e) {
uint32_t version;
struct stat file_stat;

// printf("sizeof(bloom_data_t) %d\n", sizeof(bloom_data_t));

if (-1 == stat(filename, &file_stat)) {
//create a new one
// printf("creating new file\n");
if (-1 == (file = open(filename, O_CREAT | O_RDWR, S_IWUSR | S_IRUSR | S_IRGRP | S_IWGRP))) {
return NULL;
}
Expand All @@ -44,17 +47,31 @@ bloom_t *bloom_open(char* filename, long n, double e) {

bloom->data.m = m;
bloom->data.k = (int) round(log(2) * m / n);
pwrite(file, bloom, sizeof(bloom_t) + BYTE_SIZE(m), 0);
pwrite(file, &bloom->data, sizeof(bloom_data_t) + BYTE_SIZE(m), 0);
} else {
// printf("opening existing file\n");
if (-1 == (file = open(filename, O_RDWR))) {
return NULL;
}

pread(file, &version, sizeof(uint32_t), 0);
pread(file, &m, sizeof(uint32_t), sizeof(uint32_t));
pread(file, &m, sizeof(uint32_t), 4);

// printf("read version of %d\n", version);
// printf("read m of %d\n", m);
bloom = malloc(sizeof(bloom_t) + BYTE_SIZE(m));
pread(file, &bloom->data, sizeof(bloom_data_t) + BYTE_SIZE(m), 0);
}
// printf("bloom->data %d\n", (int)&bloom->data);
// printf("n %d\n", (int)&bloom->data.n);
// printf("keys %d\n", (int)&bloom->data.keys);
// printf("version = %d %d\n", bloom->data.version, (( int)&(bloom->data.version) - ( int)&(bloom->data)));
// printf("m = %d %d\n", bloom->data.m, (( int)&(bloom->data.m) - ( int)&(bloom->data)));
// printf("n = %d %d\n", bloom->data.n, (( int)&(bloom->data.n) - ( int)&(bloom->data)));
// printf("e = %f %d\n", bloom->data.e, (( int)&(bloom->data.e) - ( int)&(bloom->data)));
// printf("k = %d %d\n", bloom->data.k, (( int)&(bloom->data.k) - ( int)&(bloom->data)));
// printf("keys = %d %d\n", bloom->data.keys, (( int)&(bloom->data.keys) - ( int)&(bloom->data)));
// printf("seed = %d %d\n", bloom->data.seed, (( int)&(bloom->data.seed) - ( int)&(bloom->data)));
bloom->file = file;
bloom->filename = malloc(strlen(filename) + 1);
strcpy(bloom->filename, filename);
Expand All @@ -77,11 +94,15 @@ void bloom_put(bloom_t *bloom, char *buff, int len) {
// printf("byte %d bit %d\n", BYTE_INDEX(index), BIT_INDEX(index));
byte_index = BYTE_INDEX(index);
SET_BIT(bloom->data.bits, index);
pwrite(bloom->file, &bloom->data.bits[byte_index], 1, sizeof(bloom_t) + byte_index - 1);
// if ((sizeof(bloom_data_t) + byte_index - 1) < 108) {
// printf("writing to %d\n", sizeof(bloom_data_t) + byte_index - 1);
offset = (unsigned int)&(bloom->data.bits[byte_index]) - (unsigned int)&bloom->data;
pwrite(bloom->file, &bloom->data.bits[byte_index], 1, offset);
// printf("byte %d\n", bloom->bits[BYTE_INDEX(index)]);
}
bloom->data.keys++;
offset = ((unsigned int)&(bloom->data.keys) - (unsigned int)bloom);
offset = ((unsigned int)&(bloom->data.keys) - (unsigned int)&(bloom->data));
// printf("writing keys into offset %d\n", offset);
pwrite(bloom->file, &(bloom->data.keys), sizeof(uint32_t), offset);
}

Expand Down
12 changes: 6 additions & 6 deletions c/bloom.h
@@ -1,11 +1,11 @@

typedef struct _bloom_data_t {
uint32_t version;
uint32_t m;
uint64_t n;
double e;
uint32_t k;
uint64_t keys;
uint32_t version; //0
uint32_t m; //4
uint64_t n; //8
double e; //
uint32_t k; //
uint64_t keys; //3
uint32_t seed;
char reserved[64];
char bits[1];
Expand Down
10 changes: 9 additions & 1 deletion c/bloom_drv.c
Expand Up @@ -87,14 +87,22 @@ static void output(ErlDrvData handle, char *buf, int len) {
static void setup(bloom_drv_t *driver, char *buf, int len) {
long n;
double e;
char *filename;
int size;
int type;
int index = 0;

ei_decode_version(buf, &index, NULL);
ei_decode_tuple_header(buf, &index, NULL);
ei_get_type(buf, &index, &type, &size);
filename = driver_alloc(size+1);
ei_decode_string(buf, &index, filename);
ei_decode_long(buf, &index, &n);
ei_decode_double(buf, &index, &e);

driver->bloom = bloom_create(n, e);
driver->bloom = bloom_open(filename, n, e);

driver_free(filename);
}

static void put(bloom_drv_t *driver, char *buf, int len) {
Expand Down
2 changes: 1 addition & 1 deletion ebin/dynomite.app
Expand Up @@ -4,7 +4,7 @@
{vsn, "?VERSION"},
{modules,
[
block_server, bootstrap, commands, configuration, couch_btree, couch_file, dets_storage, dict_storage, dmerkle,
block_server, bootstrap, commands, configuration, dets_storage, dict_storage, dmerkle,
dmtree, dynomite_pb, dummy_server, dynomite, dynomite_app, dynomite_prof, dynomite_sup, dynomite_thrift_service, dynomite_web, fail_storage, fnv, fs_storage,
lib_misc, mediator, membership, mnesia_storage, murmur, partitions, rate, socket_server, stats_server, storage_manager,
storage_server, storage_server_sup, stream, sync_manager, sync_server, sync_server_sup, tc_storage, ulimit, vector_clock,
Expand Down
6 changes: 3 additions & 3 deletions elibs/bloom.erl
Expand Up @@ -12,7 +12,7 @@
-author('cliff@powerset.com').

%% API
-export([start/2, put/2, has/2, mem_size/1, key_size/1, stop/1]).
-export([start/3, put/2, has/2, mem_size/1, key_size/1, stop/1]).

%% COMMANDS
-define(SETUP, $s).
Expand All @@ -34,11 +34,11 @@
%% @end
%%--------------------------------------------------------------------

start(N, E) ->
start(Filename, N, E) ->
case load_driver() of
ok ->
P = open_port({spawn, 'bloom_drv'}, [binary]),
port_command(P, [?SETUP, term_to_binary({N, E})]),
port_command(P, [?SETUP, term_to_binary({Filename, N, E})]),
{ok, {bloom, P}};
{error, Err} ->
Msg = erl_ddll:format_error(Err),
Expand Down
4 changes: 2 additions & 2 deletions elibs/dmerkle.erl
Expand Up @@ -37,7 +37,7 @@ open(FileName) ->
open(FileName, undefined).

open(FileName, BlockSize) ->
gen_server:start_link(?MODULE, {FileName,BlockSize}, []).
gen_server:start_link(?MODULE, {FileName,BlockSize}, [{spawn_opt, [{fullsweep_after, 10}]}]).

count_trace(Pid, Key) ->
gen_server:call(Pid, {count_trace, Key}).
Expand Down Expand Up @@ -70,7 +70,7 @@ deletea(Key, Tree) ->
gen_server:cast(Tree, {delete, Key}).

key_diff(TreeA, TreeB) ->
gen_server:call(TreeA, {key_diff, TreeB}).
gen_server:call(TreeA, {key_diff, TreeB}, infinity).

close(Tree) ->
gen_server:cast(Tree, close).
Expand Down
17 changes: 15 additions & 2 deletions elibs/dynomite.erl
Expand Up @@ -7,19 +7,32 @@
start() ->
crypto:start(),
load_and_start_apps([os_mon, thrift, mochiweb, dynomite]).
<<<<<<< HEAD:elibs/dynomite.erl

running(Node) when Node == node() ->
true;

=======

% running(Node) when Node == node() ->
% true;

>>>>>>> cliff/master:elibs/dynomite.erl
running(Node) ->
Ref = erlang:monitor(process, {membership, Node}),
receive
R = receive
{'DOWN', Ref, _, _, _} -> false
after 1 ->
erlang:demonitor(Ref),
true
<<<<<<< HEAD:elibs/dynomite.erl
end.

=======
end,
erlang:demonitor(Ref),
R.

>>>>>>> cliff/master:elibs/dynomite.erl
running_nodes() ->
[Node || Node <- nodes([this,visible]), dynomite:running(Node)].

Expand Down
58 changes: 57 additions & 1 deletion elibs/dynomite_rpc.erl
Expand Up @@ -7,11 +7,13 @@

connect(Node) ->
case net_adm:ping(Node) of
pong -> {ok, Node};
pong ->
{ok, Node};
pang -> {error, "Cannot connect."}
end.

get(Node, Key) ->
<<<<<<< HEAD:elibs/dynomite_rpc.erl
case rpc:call(Node, mediator, get, [Key]) of
{badrpc, Reason} -> {failure, Reason};
Result -> Result
Expand All @@ -36,3 +38,57 @@ delete(Node, Key) ->
end.

close(Node) -> erlang:disconnect(Node).
=======
GetFun = fun(N) ->
case rpc:call(N, mediator, get, [Key]) of
{badrpc, Reason} -> {failure, Reason};
Result -> Result
end
end,
robustify(Node, GetFun).

put(Node, Key, Context, Value) ->
PutFun = fun(N) ->
case rpc:call(N, mediator, put, [Key, Context, Value]) of
{badrpc, Reason} -> {failure, Reason};
Result -> Result
end
end,
robustify(Node, PutFun).

has_key(Node, Key) ->
HasFun = fun(N) ->
case rpc:call(N, mediator, has_key, [Key]) of
{badrpc, Reason} -> {failure, Reason};
Result -> Result
end
end,
robustify(Node, HasFun).

delete(Node, Key) ->
DelFun = fun(N) ->
case rpc:call(N, mediator, delete, [Key]) of
{badrpc, Reason} -> {failure, Reason};
Result -> Result
end
end,
robustify(Node, DelFun).

close(Node) -> erlang:disconnect(Node).


robustify(Node, Fun) ->
erlang:monitor_node(Node, true),
R = receive
{nodedown, Node} ->
% io:format("node ~p was down~n", [Node]),
case dynomite:running_nodes() of
[] -> {failure, "No dynomite nodes available."};
[NextNode|_] -> Fun(NextNode)
end
after 0 ->
Fun(Node)
end,
erlang:monitor_node(Node, false),
R.
>>>>>>> cliff/master:elibs/dynomite_rpc.erl
2 changes: 1 addition & 1 deletion elibs/dynomite_sup.erl
Expand Up @@ -49,7 +49,7 @@ start_link(ConfigFile) ->
%%--------------------------------------------------------------------
init(ConfigFile) ->
Node = node(),
Nodes = dynomite:running_nodes(),
Nodes = dynomite:running_nodes() ++ [node()],
Children = [
{fnv,
{fnv,start,[]},
Expand Down
4 changes: 2 additions & 2 deletions elibs/storage_manager.erl
Expand Up @@ -41,10 +41,10 @@ start_link() ->
gen_server:start_link({local, storage_manager}, ?MODULE, [], []).

load(Nodes, Partitions, PartsForNode) ->
gen_server:call(storage_manager, {load, Nodes, Partitions, PartsForNode, true}).
gen_server:call(storage_manager, {load, Nodes, Partitions, PartsForNode, true}, infinity).

load_no_boot(Nodes, Partitions, PartsForNode) ->
gen_server:call(storage_manager, {load, Nodes, Partitions, PartsForNode, false}).
gen_server:call(storage_manager, {load, Nodes, Partitions, PartsForNode, false}, infinity).

loaded() ->
gen_server:call(storage_manager, loaded).
Expand Down
2 changes: 1 addition & 1 deletion elibs/storage_server.erl
Expand Up @@ -41,7 +41,7 @@
%% @end
%%--------------------------------------------------------------------
start_link(StorageModule, DbKey, Name, Min, Max, BlockSize) when is_list(StorageModule) ->
gen_server:start_link({local, Name}, ?MODULE, {list_to_atom(StorageModule),DbKey,Name,Min,Max, BlockSize}, []);
gen_server:start_link({local, Name}, ?MODULE, {list_to_atom(StorageModule),DbKey,Name,Min,Max, BlockSize}, [{spawn_opt, [{fullsweep_after, 10}]}]);

start_link(StorageModule, DbKey, Name, Min, Max, BlockSize) ->
gen_server:start_link({local, Name}, ?MODULE, {StorageModule,DbKey,Name,Min,Max, BlockSize}, []).
Expand Down
2 changes: 1 addition & 1 deletion elibs/sync_manager.erl
Expand Up @@ -41,7 +41,7 @@ stop() ->
gen_server:cast(sync_manager, stop).

load(Nodes, Partitions, PartsForNode) ->
gen_server:call(sync_manager, {load, Nodes, Partitions, PartsForNode}).
gen_server:call(sync_manager, {load, Nodes, Partitions, PartsForNode}, infinity).

sync(Part, Master, NodeA, NodeB, DiffSize) ->
gen_server:cast(sync_manager, {sync, Part, Master, NodeA, NodeB, DiffSize}).
Expand Down
2 changes: 1 addition & 1 deletion elibs/sync_server.erl
Expand Up @@ -25,7 +25,7 @@
%% @end
%%--------------------------------------------------------------------
start_link(Name, Partition) ->
Pid = spawn_link(fun() ->
Pid = proc_lib:spawn_link(fun() ->
sync_server:loop(#state{name=Name,partition=Partition,paused=false})
end),
register(Name, Pid),
Expand Down

0 comments on commit 9601f4a

Please sign in to comment.