Skip to content
Browse files

New Beginning

now otp based.

functions work, watches not included

supervisor is not working properly
  • Loading branch information...
0 parents commit 78a3f26ff85eff0382762c59634e58c85be64fc6 Marco committed
9 ebin/ezk.app
@@ -0,0 +1,9 @@
+{application,ezk,
+ [{description,[]},
+ {vsn,"1"},
+ {registered,[]},
+ {applications,[kernel,stdlib]},
+ {mod,{ezk_app,[]}},
+ {env,[]},
+ {modules,[ezk_app,ezk_connection,ezk_log,ezk_message_2_packet,
+ ezk_packet_2_message,ezk_sup,testit]}]}.
21 include/ezk.hrl
@@ -0,0 +1,21 @@
+-ifndef(ezk_ezk_HRL).
+-define(ezk_ezk_HRL,1).
+-define(LOG, ezk_log:put).
+
+-record(cstate, {open_requests = dict:new(),
+ socket :: port(),
+ ip,
+ port :: 0..65535,
+ timeout,
+ sessionid,
+ iteration :: pos_integer(),
+ outstanding_heartbeats = 0
+ }).
+
+-record(zkmsg, {cmd :: watchevent | ls | create | delete | get | set,
+ msgid :: pos_integer(),
+ zxid :: pos_integer(),
+ payload :: binary()
+ }).
+
+-endif.
BIN rebar
Binary file not shown.
0 rebar.config
No changes.
12 src/ezk.app.src
@@ -0,0 +1,12 @@
+{application, ezk,
+ [
+ {description, ""},
+ {vsn, "1"},
+ {registered, []},
+ {applications, [
+ kernel,
+ stdlib
+ ]},
+ {mod, { ezk_app, []}},
+ {env, []}
+ ]}.
16 src/ezk_app.erl
@@ -0,0 +1,16 @@
+-module(ezk_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, StartArgs) ->
+ ezk_sup:start_link(StartArgs).
+
+stop(_State) ->
+ ok.
147 src/ezk_connection.erl
@@ -0,0 +1,147 @@
+%%%-------------------------------------------------------------------
+%%% @author Marco <marco@gauss>
+%%% @copyright (C) 2011, Marco
+%%% @doc
+%%%
+%%% @end
+%%% Created : 14 Mar 2011 by Marco <marco@gauss>
+%%%-------------------------------------------------------------------
+-module(ezk_connection).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([create/2, create/3, delete/1, set/2, get/1, ls2/1, ls/1, die/0]).
+-include_lib("../include/ezk.hrl").
+
+-define(SERVER, ?MODULE).
+-define(HEARTBEATTIME, 10000).
+
+start_link(Args) ->
+ ?LOG(1,"Connection: Start link called with Args: ~w",[Args]),
+ gen_server:start_link({local, ?SERVER}, ?MODULE, Args, []).
+
+
+init([Ip, Port, WantedTimeout]) ->
+ ?LOG(1, "Connection: Server starting"),
+ {ok, Socket} = gen_tcp:connect(Ip,Port,[binary,{packet,4}]),
+ HandshakePacket = <<0:64, WantedTimeout:64, 0:64, 16:64, 0:128>>,
+ ok = gen_tcp:send(Socket, HandshakePacket),
+ ok = inet:setopts(Socket,[{active,once}]),
+ receive
+ {tcp,Socket,Reply} ->
+ <<RealTimeout:64, SessionId:64, 16:32, _Hash:128>> = Reply,
+ InitialState = #cstate{
+ socket = Socket, ip = Ip,
+ port = Port, timeout = RealTimeout,
+ sessionid = SessionId, iteration = 1},
+ ok = inet:setopts(Socket,[{active,once}]),
+ ?LOG(1, "Connection: Startup complete",[]),
+ ?LOG(3, "Connection: Initial State : ~w",[InitialState])
+ end,
+ erlang:send_after(?HEARTBEATTIME, self(), heartbeat),
+ {ok, InitialState}.
+
+die() ->
+ erlang:exit(?SERVER, "You stood in my way!").
+
+create(Path, Data) ->
+ gen_server:call(?SERVER, {command, {create, Path, Data, []}}).
+
+create(Path, Data, Typ) ->
+ gen_server:call(?SERVER, {command, {create, Path, Data, Typ}}).
+
+delete(Path) ->
+ gen_server:call(?SERVER, {command, {delete, Path}}).
+
+get(Path) ->
+ gen_server:call(?SERVER, {command, {get, Path}}).
+
+%% get(Path, WatchOwner, WatchMessage) ->
+%% gen_server:cast(?SERVER, {getw, Path, WatchOwner, WatchMessage}).
+
+set(Path, Data) ->
+ gen_server:call(?SERVER, {command, {set, Path, Data}}).
+
+ls(Path) ->
+ gen_server:call(?SERVER, {command, {ls, Path}}).
+
+%% ls(Path, WatchOwner, WatchMessage) ->
+%% gen_server:cast(?SERVER, {lsw, Path ,WatchOwner, WatchMessage}).
+
+ls2(Path) ->
+ gen_server:call(?SERVER, {command, {ls2, Path}}).
+
+%% ls2(Path, WatchOwner, WatchMessage) ->
+%% gen_server:cast(?SERVER, {ls2w, Path ,WatchOwner, WatchMessage}).
+
+handle_call({command, Args}, From, State) ->
+ Iteration = State#cstate.iteration,
+ {ok, CommId, Path, Packet} = ezk_message_2_packet:make_packet(Args, Iteration),
+ gen_tcp:send(State#cstate.socket, Packet),
+ ?LOG(1, "Connection: Packet send"),
+ NewOpen = dict:store(Iteration, {CommId, Path, From}, State#cstate.open_requests),
+ ?LOG(3, "Connection: Saved open Request."),
+ NewState = State#cstate{iteration = Iteration+1, open_requests = NewOpen },
+ ?LOG(3, "Connection: Returning to wait status"),
+ {noreply, NewState}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({tcp, _Port, Info}, State) ->
+ ?LOG(1, "Connection: Got a message from Server"),
+ TypedMessage = ezk_packet_2_message:get_message_typ(Info),
+ ?LOG(3, "Connection: Typedmessage is ~w",[TypedMessage]),
+ case TypedMessage of
+ {heartbeat, _Heartbeat} ->
+ ?LOG(4, "Got a Heartbeat"),
+ Outstanding = State#cstate.outstanding_heartbeats,
+ NewState = State#cstate{outstanding_heartbeats = Outstanding-1},
+ ok = inet:setopts(State#cstate.socket,[{active,once}]),
+ {noreply, NewState};
+ %Watchevents
+ {watchevent} ->
+ ok = inet:setopts(State#cstate.socket,[{active,once}]),
+ {noreply, State};
+ %Other Messages
+ {normal, MessId, _Zxid, PayloadWithErrorcode} ->
+ ?LOG(3, "Connection: Normal Message"),
+ {ok, {CommId, Path, From}} = dict:find(MessId, State#cstate.open_requests),
+ ?LOG(3, "Connection: Found dictonary entry"),
+ NewDict = dict:erase(MessId, State#cstate.open_requests),
+ NewState = State#cstate{open_requests = NewDict},
+ ?LOG(3, "Connection: Dictionary updated"),
+ Reply = ezk_packet_2_message:replymessage_2_reply(CommId, Path,
+ PayloadWithErrorcode),
+ ?LOG(3, "Connection: determinated reply"),
+ gen_server:reply(From, Reply),
+ ok = inet:setopts(State#cstate.socket,[{active,once}]),
+ {noreply, NewState}
+ end;
+
+handle_info(heartbeat, State) ->
+ case State#cstate.outstanding_heartbeats of
+ 0 ->
+ ?LOG(4, "Send a Heartbeat"),
+ Heartbeat = << 255,255,255,254, 11:32>>,
+ gen_tcp:send(State#cstate.socket, Heartbeat),
+ NewState = State#cstate{outstanding_heartbeats = 1},
+ erlang:send_after(?HEARTBEATTIME, self(), heartbeat),
+ {noreply, NewState};
+ _Else ->
+ error
+ end.
+
+terminate(_Reason, _State) ->
+ ?LOG(1,"Connection: TERMINATING"),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
24 src/ezk_log.erl
@@ -0,0 +1,24 @@
+-module(ezk_log).
+-export([put/3, put/2]).
+-define(LEVEL,4).
+%% 0: nothing
+%% 1: important things
+%% 2: also sequenzer things
+%% 3: most things
+%% 4: also heartbeats
+
+put(NeededLevel, Message, Parameter) ->
+ if
+ NeededLevel =< ?LEVEL ->
+ %%error_logger:info_report([{message, Message}, {parameter, Parameter}]),
+ io:format(("Logger: " ++ Message++"~n"), Parameter);
+ true -> {}
+ end.
+
+put(NeededLevel, Message) ->
+ if
+ NeededLevel =< ?LEVEL ->
+ io:format(("Logger: " ++ Message++"~n"));
+ true -> {}
+ end.
+
61 src/ezk_message_2_packet.erl
@@ -0,0 +1,61 @@
+-module(ezk_message_2_packet).
+-include_lib("../include/ezk.hrl").
+-export([make_packet/2]).
+make_packet({create, Path, Data, Typ}, Iteration) ->
+ case Typ of
+ e -> Mode = 1;
+ s -> Mode = 2;
+ _Else -> Mode = 0
+ end,
+ Load = <<(pack_it_l2b(Path))/binary,
+ (pack_it_l2b(Data))/binary,
+ 1:32,
+ 31:32,
+ (pack_it_l2b("world"))/binary,
+ (pack_it_l2b("anyone"))/binary,
+ Mode:32>>,
+ Command = 1,
+ wrap_packet({Command, Path, Load, {none}}, Iteration);
+
+make_packet({delete, Path}, Iteration ) ->
+ Load = <<(pack_it_l2b(Path))/binary, 255, 255, 255, 255 >>,
+ Command = 2,
+ wrap_packet({Command, Path, Load, {none}}, Iteration);
+
+make_packet({get, Path}, Iteration) ->
+ Load = <<(pack_it_l2b(Path))/binary, 0>>,
+ Command = 4,
+ wrap_packet({Command, Path, Load, {none}}, Iteration );
+
+make_packet({set, Path, Data}, Iteration) ->
+ Load = <<(pack_it_l2b(Path))/binary,
+ (pack_it_l2b(Data))/binary,
+ <<255,255,255,255>>/binary>>,
+ Command = 5,
+ wrap_packet({Command, Path, Load, {none}}, Iteration );
+
+make_packet({ls, Path}, Iteration) ->
+ Load = <<(pack_it_l2b(Path))/binary, 0:8>>,
+ Command = 8,
+ wrap_packet({Command, Path, Load, {none}}, Iteration );
+
+make_packet({ls2, Path}, Iteration) ->
+ Load = <<(pack_it_l2b(Path))/binary, 0:8>>,
+ Command = 12,
+ wrap_packet({Command, Path, Load, {none}}, Iteration ).
+
+
+%--------------------------------------------------------------------
+%LIttle Helpers
+%--------------------------------------------------------------------
+pack_it_l2b(List) ->
+ Length = length(List),
+ <<Length:32,(list_to_binary(List))/binary>>.
+
+
+wrap_packet({Command, Path, Load, _WatchData}, Iteration) ->
+ ?LOG(3, "message_2_packet: Try send a request {command, Path, Load}: ~w",
+ [{Command, Path, Load}]),
+ Packet = <<Iteration:32, Command:32, Load/binary>>,
+ ?LOG(3, "message_2_packet: Request send"),
+ {ok, Command, Path, Packet}.
103 src/ezk_packet_2_message.erl
@@ -0,0 +1,103 @@
+-module(ezk_packet_2_message).
+-export([get_message_typ/1, replymessage_2_reply/3]).
+-include_lib("../include/ezk.hrl").
+
+
+get_message_typ(Data) ->
+ case Data of
+ <<255,255,255,254, Heartbeat/binary>> ->
+ {heartbeat, Heartbeat};
+ %%%Watchevents
+ <<4294967295:32, 4294967295:32, 4294967295:32, _Left/binary>> ->
+ ?LOG(3, "packet_2_message: A Watchevent arrived"),
+ {watchevent};
+ %%%Other Messages
+ <<MessId:32, 0:32, Zxid:32, Payload/binary>> ->
+ ?LOG(3, "packet_2_message: A normal Message arrived"),
+ {normal, MessId, Zxid, Payload}
+
+ end.
+
+replymessage_2_reply(CommId, Path, PayloadWithErrorCode) ->
+ ?LOG(1,"packet_2_message: Trying to Interpret payload: ~w", [PayloadWithErrorCode]),
+ case PayloadWithErrorCode of
+ <<0,0,0,0,Payload/binary>> ->
+ ?LOG(1, "packet_2_message: Interpreting the payload"),
+ Replydata = interpret_reply_data(CommId, Path, Payload),
+ Reply = {ok, Replydata};
+ <<255,255,255,146,_Payload/binary>> ->
+ Reply = {error, "Directory already exists"};
+ <<255,255,255,155,_Payload/binary>> ->
+ Reply = {error, "Directory not found"};
+ Cody ->
+ Reply = {unknown, "Wow, you just found an unexpected Error.", Cody}
+ end,
+ Reply.
+
+%%% create
+interpret_reply_data(1, _Path, Reply) ->
+ <<LengthOfData:32, Data/binary>> = Reply,
+ {ReplyPath, _Left} = split_binary(Data, LengthOfData),
+ binary_to_list(ReplyPath);
+
+%%% delete
+interpret_reply_data(2, Path, _Reply) ->
+ Path;
+
+%%% get
+interpret_reply_data(4, _Path, Reply) ->
+ <<LengthOfData:32, Data/binary>> = Reply,
+ {ReplyData, Left} = split_binary(Data, LengthOfData),
+ Parameter = getbinary_2_list(Left),
+ {binary_to_list(ReplyData), Parameter};
+
+%%% set
+interpret_reply_data(5, _Path, Reply) ->
+ getbinary_2_list(Reply);
+
+%%% ls
+%%% Returns Children as a List of binarys
+interpret_reply_data(8, _Path, Reply) ->
+ ?LOG(4,"packet_2_message: Interpreting a ls"),
+ <<NumberOfAnswers:32, Data/binary>> = Reply,
+ ?LOG(4,"packet_2_message: Number of Children: ~w",[NumberOfAnswers]),
+ ?LOG(4,"packet_2_message: The Binary is: ~w",[Data]),
+ {List, _Left} = get_n_paths(NumberOfAnswers, Data),
+ ?LOG(4,"packet_2_message: Paths extracted."),
+ ?LOG(4,"packet_2_message: Paths are: ~w",[List]),
+ lists:map(fun(A) -> list_to_binary(A) end, List);
+
+%%% ls2
+interpret_reply_data(12, _Path, Reply) ->
+ {<<NumberOfAnswers:32>>, Data} = split_binary(Reply, 4),
+ {Children, Left} = get_n_paths(NumberOfAnswers, Data),
+ Parameter = getbinary_2_list(Left),
+ [{children, Children}|Parameter].
+
+%%----------------------------------------------------------------
+%% Little Helpers
+%%----------------------------------------------------------------
+
+get_n_paths(0, Binary) ->
+ {[],Binary};
+get_n_paths(N, Binary) ->
+ {ThisPathBin, ToProcessBin} = unpack(Binary),
+ {RekResult, Left2} = get_n_paths(N-1, ToProcessBin),
+ {[binary_to_list(ThisPathBin) | RekResult ], Left2}.
+
+
+getbinary_2_list(Binary) ->
+ <<0:32, Czxid:32, 0:32, Mzxid:32,
+ Ctime:64, Mtime:64,
+ DaVer:32, CVer:32, 0:96,
+ DaLe:32, NumChi:32, 0:32,
+ Pzxid:32>> = Binary,
+ [{czxid, Czxid}, {mzxid, Mzxid},
+ {ctime, Ctime}, {mtime, Mtime},
+ {dataversion, DaVer}, {datalength, DaLe},
+ {numberChildren,NumChi}, {pzxid, Pzxid},
+ {cversion, CVer}].
+
+unpack(Binary) ->
+ <<Length:32, Load/binary>> = Binary,
+ split_binary(Load, Length).
33 src/ezk_sup.erl
@@ -0,0 +1,33 @@
+
+-module(ezk_sup).
+
+-behaviour(supervisor).
+-include_lib("../include/ezk.hrl").
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+ Args = ["192,168,1,111",2181, 30000],
+ supervisor:start_link({local, ?MODULE}, ?MODULE, Args).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init(Args) ->
+ ?LOG(1,"Supervisor: making Childspec."),
+ ChildSpec = [?CHILD(ezk_connection,worker,Args)],
+ ?LOG(1,"Supervisor: done Childspec: ~w.",[ChildSpec]),
+ {ok, { {one_for_one, 5, 10}, ChildSpec} }.
+
24 src/makefile
@@ -0,0 +1,24 @@
+.SUFFIXES: .erl .beam .yrl
+
+ .erl.beam:
+ erlc -W $<
+
+ .yrl.erl:
+ erlc -W $<
+
+ ERL = erl -boot start_clean
+
+ MODS = ezk_connection ezk_log ezk_message_2_packet ezk_packet_2_message testit
+
+ all: compile
+
+ compile: ${MODS:%=%.beam} moveit
+
+ moveit:
+ mv ./*.beam ../ebin/
+
+ clean:
+ rm -rf *.beam erl_crash.dump
+
+ subdirs:
+ cd src; make
34 src/testit.erl
@@ -0,0 +1,34 @@
+-module(testit).
+
+-export([start/0]).
+-export([tls/0, tls2/0, tg/0, ts/0, tcr/0, tlong/0]).
+
+start() ->
+ {ok, A} = ezk_connection:start_link(["192.168.1.111", 2181, 30000]),
+ A.
+
+tls() ->
+ ezk_connection:ls("/").
+
+tls2() ->
+ ezk_connection:ls2("/").
+
+tg() ->
+ ezk_connection:get("/").
+
+ts() ->
+ ezk_connection:set("/","tester").
+
+tcr() ->
+ ezk_connection:create("/creationtest"),
+ ezk_connection:delete("/creationtest").
+
+tlong() ->
+ {ok, A} = ezk_connection:start_link(["192.168.1.111", 2181, 30000]),
+ io:format("ls: ~w ~n: ",[ezk_connection:ls("/")]),
+ io:format("ls: ~w ~n: ",[ezk_connection:ls("/")]),
+ io:format("ls2: ~w ~n: ",[ezk_connection:ls2("/")]),
+ io:format("get: ~w ~n: ",[ezk_connection:get("/")]),
+ A.
+
+

0 comments on commit 78a3f26

Please sign in to comment.
Something went wrong with that request. Please try again.