Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

support PUBLISH and SUBSCRIBE

  • Loading branch information...
commit 43d8ce198d1325519dd318b499acc3e91d47c2e4 1 parent 60dc558
Ery Lee authored January 09, 2012
5  src/apps/emqtt/include/emqtt.hrl
@@ -60,6 +60,11 @@
60 60
   arg
61 61
 }).
62 62
 
  63
+-record(topic, {
  64
+  name,
  65
+  sub 
  66
+}).
  67
+
63 68
 -record(sub, {
64 69
   topic,
65 70
   qos = 0
7  src/apps/emqtt/src/emqtt.erl
@@ -63,10 +63,13 @@
63 63
                    [{description, "core initialized"},
64 64
                     {requires,    kernel_ready}]}).
65 65
 
66  
--emqtt_boot_step({networking,
67  
-                   [{mfa,         {emqtt_networking, boot, []}},
  66
+-emqtt_boot_step({router,
  67
+                   [{mfa,         {emqtt_router, boot, []}},
68 68
                     {requires,    core_initialized}]}).
69 69
 
  70
+-emqtt_boot_step({networking,
  71
+                   [{mfa,         {emqtt_networking, boot, []}},
  72
+                    {requires,    router}]}).
70 73
 
71 74
 %%---------------------------------------------------------------------------
72 75
 
29  src/apps/emqtt/src/emqtt_client.erl
@@ -27,6 +27,8 @@ start_link(Sock) ->
27 27
 	gen_server2:start_link(?MODULE, [Sock], []).
28 28
 
29 29
 init([Sock]) ->
  30
+	{A1,A2,A3} = now(),
  31
+	random:seed(A1, A2, A3),
30 32
     {ok, #state{sock = Sock}}.
31 33
 
32 34
 %%--------------------------------------------------------------------
@@ -47,10 +49,27 @@ handle_call(_Req, _From, State) ->
47 49
 %% Description: Handling cast messages
48 50
 %%--------------------------------------------------------------------
49 51
 handle_cast(#mqtt_packet{type = ?CONNECT}, #state{sock = Sock} = State) ->
50  
-	Reply = #mqtt_packet{type = ?CONNACK, arg = 0},
51  
-	send(Reply, Sock),
  52
+	send(#mqtt_packet{type = ?CONNACK, arg = 0}, Sock),
52 53
     {noreply, State};
53 54
 
  55
+handle_cast(#mqtt_packet{type = ?PUBLISH, arg={Topic, Msg}}, State) ->
  56
+	emqtt_router:publish(Topic, Msg),
  57
+    {noreply, State};
  58
+
  59
+handle_cast(#mqtt_packet{type = ?SUBSCRIBE, arg=Subs}, #state{sock = Sock} = State) ->
  60
+	[emqtt_router:subscribe(Topic, self()) ||
  61
+		#sub{topic = Topic} <- Subs],
  62
+	MsgId = random:uniform(16#FFFF),
  63
+	send(#mqtt_packet{type = ?SUBACK, arg={MsgId, Subs}}, Sock),
  64
+    {noreply, State};
  65
+
  66
+handle_cast(#mqtt_packet{type = ?PINGREQ}, #state{sock = Sock} = State) ->
  67
+	send(#mqtt_packet{type = ?PINGRESP}, Sock),
  68
+    {noreply, State};
  69
+
  70
+handle_cast(#mqtt_packet{type = ?DISCONNECT}, State) ->
  71
+    {stop, normal, State};
  72
+
54 73
 handle_cast(Msg, State) ->
55 74
 	io:format("badmsg: ~p~n", [Msg]),
56 75
     {noreply, State}.
@@ -61,6 +80,10 @@ handle_cast(Msg, State) ->
61 80
 %%                                       {stop, Reason, State}
62 81
 %% Description: Handling all non call/cast messages
63 82
 %%--------------------------------------------------------------------
  83
+handle_info({publish, Topic, Msg}, #state{sock = Sock} = State) ->
  84
+	send(#mqtt_packet{type = ?PUBLISH, arg={Topic, Msg}}, Sock),
  85
+    {noreply, State};
  86
+
64 87
 handle_info(_Info, State) ->
65 88
     {noreply, State}.
66 89
 
@@ -72,6 +95,7 @@ handle_info(_Info, State) ->
72 95
 %% The return value is ignored.
73 96
 %%--------------------------------------------------------------------
74 97
 terminate(_Reason, _State) ->
  98
+	emqtt_router:unsubscribe(self()),
75 99
     ok.
76 100
 
77 101
 %%--------------------------------------------------------------------
@@ -82,6 +106,7 @@ code_change(_OldVsn, State, _Extra) ->
82 106
     {ok, State}.
83 107
 
84 108
 send(#mqtt_packet{} = Message, Socket) ->
  109
+  io:format("Message Sent: ~p~n", [Message]),
85 110
 %%?LOG({mqtt_core, send, pretty(Message)}),
86 111
   {VariableHeader, Payload} = emqtt_packet:encode_message(Message),
87 112
   ok = send(emqtt_packet:encode_fixed_header(Message), Socket),
7  src/apps/emqtt/src/emqtt_mnesia.erl
@@ -142,7 +142,12 @@ table_definitions() ->
142 142
       [{record_name, listener},
143 143
        {attributes, record_info(fields, listener)},
144 144
        {type, bag},
145  
-       {match, #listener{_='_'}}]}
  145
+       {match, #listener{_='_'}}]},
  146
+	 {topic,%emqtt_topic
  147
+      [{record_name, topic},
  148
+       {attributes, record_info(fields, topic)},
  149
+       {type, bag},
  150
+       {match, #topic{_='_'}}]}
146 151
 	] ++ gm:table_definitions().
147 152
 
148 153
 table_names() ->
36  src/apps/emqtt/src/emqtt_reader.erl
@@ -96,13 +96,15 @@ recvloop(Socket, ClientPid) ->
96 96
   FixedHeader = recv(1, Socket),
97 97
   io:format("fixed header: ~p~n", [FixedHeader]),
98 98
   RemainingLength = recv_length(Socket),
99  
-  io:format("Length: ~p~n", [RemainingLength]),
100 99
   Rest = recv(RemainingLength, Socket),
101 100
   Header = emqtt_packet:decode_fixed_header(FixedHeader),
102 101
   Message = emqtt_packet:decode_message(Header, Rest),
103  
-  io:format("Message: ~p~n", [Message]),
  102
+  io:format("Message Recved: ~p~n", [Message]),
104 103
   gen_server:cast(ClientPid, Message),
105  
-  recvloop(Socket, ClientPid).
  104
+  case Message of
  105
+  #mqtt_packet{type = ?DISCONNECT} -> stop;
  106
+  _ -> recvloop(Socket, ClientPid)
  107
+  end.
106 108
 
107 109
 recv_length(Socket) ->
108 110
   recv_length(recv(1, Socket), 1, 0, Socket).
@@ -111,13 +113,6 @@ recv_length(<<0:1, Length:7>>, Multiplier, Value, _Socket) ->
111 113
 recv_length(<<1:1, Length:7>>, Multiplier, Value, Socket) ->
112 114
   recv_length(recv(1, Socket), Multiplier * 128, Value + Multiplier * Length, Socket).
113 115
 
114  
-send_length(Length, Socket) when Length div 128 > 0 ->
115  
-  Digit = Length rem 128,
116  
-  send(<<1:1, Digit:7/big>>, Socket),
117  
-  send_length(Length div 128, Socket);
118  
-send_length(Length, Socket) ->
119  
-  Digit = Length rem 128,
120  
-  send(<<0:1, Digit:7/big>>, Socket).
121 116
 
122 117
 recv(0, _Socket) ->
123 118
   <<>>;
@@ -130,27 +125,6 @@ recv(Length, Socket) ->
130 125
       exit(Reason)
131 126
   end.
132 127
 
133  
-send(#mqtt_packet{} = Message, Socket) ->
134  
-%%?LOG({mqtt_packet_core, send, pretty(Message)}),
135  
-  {VariableHeader, Payload} = emqtt_packet:encode_message(Message),
136  
-  ok = send(emqtt_packet:encode_fixed_header(Message), Socket),
137  
-  ok = send_length(size(VariableHeader) + size(Payload), Socket),
138  
-  ok = send(VariableHeader, Socket),
139  
-  ok = send(Payload, Socket),
140  
-  ok;
141  
-send(<<>>, _Socket) ->
142  
-%%?LOG({send, no_bytes}),
143  
-  ok;
144  
-send(Bytes, Socket) when is_binary(Bytes) ->
145  
-%%?LOG({send,bytes,binary_to_list(Bytes)}),
146  
-  case gen_tcp:send(Socket, Bytes) of
147  
-    ok ->
148  
-      ok;
149  
-    {error, Reason} ->
150  
-      ?LOG({send, socket, error, Reason}),
151  
-      exit(Reason)
152  
-  end.
153  
-
154 128
 pretty(Message) when is_record(Message, mqtt_packet) ->
155 129
   lists:flatten(
156 130
     io_lib:format(
62  src/apps/emqtt/src/emqtt_router.erl
@@ -2,16 +2,25 @@
2 2
 %%% File    : emqtt_router.erl
3 3
 %%% Author  : Ery Lee <ery.lee@gmail.com>
4 4
 %%% Purpose : 
5  
-%%% Created : 28 Dec. 2011
6  
-%%% License : http://www.opengoss.com
  5
+%%% Created : 03 Apr. 2010
  6
+%%% License : http://www.monit.cn/license
7 7
 %%%
8  
-%%% Copyright (C) 2012, www.opengoss.com
  8
+%%% Copyright (C) 2007-2010, www.monit.cn
9 9
 %%%----------------------------------------------------------------------
10 10
 -module(emqtt_router).
11 11
 
12 12
 -author('ery.lee@gmail.com').
13 13
 
14  
--behavior(gen_server).
  14
+-include("emqtt.hrl").
  15
+
  16
+-export([boot/0,
  17
+		dump/0,
  18
+		subscribe/2,
  19
+		unsubscribe/1,
  20
+		unsubscribe/2,
  21
+		publish/2]).
  22
+
  23
+-behavior(gen_server2).
15 24
 
16 25
 -export([start_link/0]).
17 26
 
@@ -24,12 +33,40 @@
24 33
 
25 34
 -record(state, {}).
26 35
 
  36
+boot() ->
  37
+	{ok, _} = supervisor:start_child(
  38
+			emqtt_sup, 
  39
+			{emqtt_router, 
  40
+			 {emqtt_router, start_link, []}, 
  41
+			 permanent, infinity, worker, [emqtt_router]}
  42
+	),
  43
+	ok.
27 44
 %%--------------------------------------------------------------------
28 45
 %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
29 46
 %% Description: Starts the server
30 47
 %%--------------------------------------------------------------------
31 48
 start_link() ->
32  
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  49
+    gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
  50
+
  51
+
  52
+dump() ->
  53
+	Keys = mnesia:dirty_all_keys(topic),
  54
+	Topics = lists:flatten([mnesia:dirty_read(topic, Key) || Key <- Keys]), 
  55
+	[io:format("topic: ~p, sub: ~p~n", [Name, Pid]) 
  56
+		|| #topic{name = Name, sub = Pid} <- Topics].
  57
+
  58
+subscribe(Topic, Pid) ->
  59
+	gen_server2:call(?MODULE, {subscribe, Topic, Pid}).
  60
+
  61
+unsubscribe(Topic, Pid) ->
  62
+	gen_server2:cast(?MODULE, {unsubscribe, Topic, Pid}).
  63
+
  64
+unsubscribe(Pid) ->
  65
+	gen_server2:cast(?MODULE, {unsubscribe, Pid}).
  66
+
  67
+publish(Topic, Msg) ->
  68
+	Subscribers = mnesia:dirty_read(topic, Topic),
  69
+	[Sub ! {publish, Topic, Msg} || #topic{sub = Sub} <- Subscribers].
33 70
 
34 71
 %%====================================================================
35 72
 %% gen_server callbacks
@@ -52,14 +89,28 @@ init([]) ->
52 89
 %%                                      {stop, Reason, State}
53 90
 %% Description: Handling call messages
54 91
 %%--------------------------------------------------------------------
  92
+handle_call({subscribe, Topic, Pid}, _From, State) ->
  93
+	Reply = mnesia:dirty_write(#topic{name = Topic, sub = Pid}),
  94
+    {reply, Reply, State};
  95
+
55 96
 handle_call(_Req, _From, State) ->
56 97
     {reply, {error, badreq}, State}.
  98
+
57 99
 %%--------------------------------------------------------------------
58 100
 %% Function: handle_cast(Msg, State) -> {noreply, State} |
59 101
 %%                                      {noreply, State, Timeout} |
60 102
 %%                                      {stop, Reason, State}
61 103
 %% Description: Handling cast messages
62 104
 %%--------------------------------------------------------------------
  105
+handle_cast({unsubscribe, Topic, Pid}, State) ->
  106
+	mnesia:dirty_delete_object(#topic{name = Topic, sub = Pid}),
  107
+    {noreply, State};
  108
+
  109
+handle_cast({unsubscribe, Pid}, State) ->
  110
+	Subscribers = mnesia:dirty_match_object(#topic{sub=Pid}),
  111
+	[mnesia:dirty_delete(Sub) || Sub <- Subscribers],
  112
+    {noreply, State};
  113
+
63 114
 handle_cast(_Msg, State) ->
64 115
     {noreply, State}.
65 116
 
@@ -92,4 +143,3 @@ code_change(_OldVsn, State, _Extra) ->
92 143
 %%% Internal functions
93 144
 %%--------------------------------------------------------------------
94 145
 
95  
-

0 notes on commit 43d8ce1

Please sign in to comment.
Something went wrong with that request. Please try again.