Skip to content

Commit

Permalink
dev reference ref process
Browse files Browse the repository at this point in the history
  • Loading branch information
DLive committed Jun 22, 2019
1 parent 1f64b4b commit 216b5a7
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 68 deletions.
4 changes: 2 additions & 2 deletions include/dubbo.hrl
Expand Up @@ -102,8 +102,8 @@


-record(interface_list, {interface, pid, connection_info}).
-record(provider_node_list, {host_flag, pid, weight, readonly = false}).
-record(connection_info, {connection_id, pid, weight, host_flag, readonly = false}).
%%-record(provider_node_list, {host_flag, pid, weight, readonly = false}).
-record(connection_info, {host_flag, pid, weight, readonly = false}).

-type dubbo_request() :: #dubbo_request{}.
-type dubbo_response() :: #dubbo_response{}.
10 changes: 6 additions & 4 deletions include/dubboerl.hrl
Expand Up @@ -14,11 +14,13 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%------------------------------------------------------------------------------
-define(PROVIDER_IMPL_TABLE,provider_impl_table).
-define(PROVIDER_IMPL_TABLE, provider_impl_table).

-define(PROVIDER_WORKER,provider_worker).
-define(PROVIDER_WORKER, provider_worker).

-define(TRAFFIC_CONTROL,traffic_control).
-define(TRAFFIC_CONTROL, traffic_control).


-record(dubbo_url,{scheme,user_info,host,port,path,parameters,fragment}).
-record(dubbo_url, {scheme, user_info, host, port, path, parameters, fragment}).

-record(dubbo_invoker, {host_flag, handle}).
29 changes: 27 additions & 2 deletions src/dubbo_directory.erl
Expand Up @@ -17,7 +17,7 @@
-module(dubbo_directory).

-behaviour(gen_server).

-include("dubboerl.hrl").
-export([subscribe/2,notify/2]).
%% API
-export([start_link/0]).
Expand Down Expand Up @@ -87,8 +87,23 @@ notify(Interface,UrlList)->
%% dubbo_consumer_pool:start_consumer(Interface, UrlList),
ok.


refresh_invoker(UrlList)->
NewInvokers = refresh_invoker(UrlList,[]).
case pick_interface(UrlList) of
{error,Reason}->
fail;
{"empty",Interface}->
todo_destroy;
{_,Interface} ->
OldProviderHosts = dubbo_provider_consumer_reg_table:get_interface_provider_node(Interface),
NewInvokers = refresh_invoker(UrlList,[]),
NewProviderHosts = [Item#dubbo_invoker.host_flag || Item <- NewInvokers],
DeleteProverList = OldProviderHosts -- NewProviderHosts,
dubbo_provider_consumer_reg_table:clean_invalid_provider(DeleteProverList)

end.
%% OldProviderHosts =


refresh_invoker([Url|Rest],Acc)->
case dubbo_extension:run_fold(protocol,refer,[Url],undefined) of
Expand All @@ -100,6 +115,16 @@ refresh_invoker([Url|Rest],Acc)->
refresh_invoker(Rest,Acc)
end.

pick_interface([Url | _]) ->
case dubbo_common_fun:parse_url(Url) of
{ok,UrlInfo}->
Interface = maps:get("interface",UrlInfo#dubbo_url.parameters),
{UrlInfo#dubbo_url.scheme,Interface};
{error,Reason} ->
{error,Reason}
end.


%%--------------------------------------------------------------------
%% @private
%% @doc
Expand Down
18 changes: 6 additions & 12 deletions src/dubbo_exchanger.erl
Expand Up @@ -21,20 +21,14 @@
%% API
-export([connect/2]).

connect(Url,Handler) ->
case dubbo_node_config_util:parse_provider_info(Url) of
{ok, ProviderConfig} ->
HostFlag= dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig),
{ok, Pid} = dubbo_transport_pool_sup:add_children(ProviderConfig,Handler),
logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]),
{ok,#connection_info{ pid = Pid, weight = get_weight(ProviderConfig), host_flag = HostFlag}};
{error, R1} ->
logger:error("parse provider info error reason ~p", [R1]),
{error,R1}
end.
connect(ProviderConfig, Handler) ->
HostFlag = dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig),
{ok, Pid} = dubbo_transport_pool_sup:add_children(ProviderConfig, Handler),
logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]),
{ok, #connection_info{pid = Pid, weight = get_weight(ProviderConfig), host_flag = HostFlag}}.



get_weight(_ProviderConfig)->
get_weight(_ProviderConfig) ->
%% todo get weight from provider info
30.
69 changes: 38 additions & 31 deletions src/dubbo_protocol_dubbo.erl
Expand Up @@ -26,50 +26,57 @@ refer(Url, Acc) ->
{ok, UrlInfo} = dubbo_common_fun:parse_url(Url),
case UrlInfo#dubbo_url.scheme of
<<"dubbo">> ->
do_refer(UrlInfo),
{ok, todo};
{ok,Invoker} = do_refer(UrlInfo),
{ok, Invoker};
_ ->
{skip, Acc}
end.

do_refer(UrlInfo) ->

ok.


getClients(ProviderUrl) ->
case new_transport(ProviderUrl) of
{ok,ConnectionInfoList} ->
ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
ok;
{error,Reason} ->
{error,Reason}
case dubbo_node_config_util:parse_provider_info(UrlInfo) of
{ok, ProviderConfig} ->
%% OldHostList = dubbo_provider_consumer_reg_table:get_interface_provider_node(ProviderConfig#provider_config.interface),
case getClients(ProviderConfig) of
{ok, ConnectionInfoList} ->
dubbo_provider_consumer_reg_table:update_node_conections(ProviderConfig#provider_config.interface,ConnectionInfoList),
HostFlag = dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig),
{ok,#dubbo_invoker{host_flag = HostFlag,handle = ?MODULE}};
{error, Reason} ->
{error, Reason}
end;
{error, R1} ->
logger:error("parse provider info error reason ~p", [R1]),
{error, R1}
end.

getClients(ProviderConfig) ->
%% @todo if connections parameter > 1, need new spec transport
case new_transport(ProviderConfig) of
{ok, ConnectionInfoList} ->
%% ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig),
{ok, ConnectionInfoList};
{error, Reason} ->
{error, Reason}
end.


%%ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true),


new_transport(ProviderUrl)->
case dubbo_node_config_util:parse_provider_info(ProviderUrl) of
{ok, ProviderConfig} ->
HostFlag = get_host_flag(ProviderConfig),
case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config) of
[] ->
case dubbo_exchanger:connect(ProviderUrl,?MODULE) of
{ok,ConnectionInfo} ->
{ok,[ConnectionInfo]};
{error,Reason} ->
logger:warning("start client fail ~p ~p",[Reason,HostFlag]),
{error,Reason}
end;
ConnectionInfoList ->
{ok,ConnectionInfoList}
new_transport(ProviderConfig) ->

HostFlag = get_host_flag(ProviderConfig),
case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config) of
[] ->
case dubbo_exchanger:connect(ProviderConfig, ?MODULE) of
{ok, ConnectionInfo} ->
{ok, [ConnectionInfo]};
{error, Reason} ->
logger:warning("start client fail ~p ~p", [Reason, HostFlag]),
{error, Reason}
end;
{error, R1} ->
logger:error("parse provider info error reason ~p", [R1]),
{error,R1}
ConnectionInfoList ->
{ok, ConnectionInfoList}
end.


Expand Down
45 changes: 28 additions & 17 deletions src/dubbo_provider_consumer_reg_table.erl
Expand Up @@ -29,7 +29,8 @@
terminate/2,
code_change/3]).

-export([update_consumer_connections/2,get_host_connections/2, select_connection/1, select_connection/2, update_connection_readonly/2, get_host_flag/1, get_host_flag/2]).
-export([update_consumer_connections/2,update_node_conections/2,get_interface_provider_node/1,get_host_connections/2, select_connection/1,
select_connection/2, update_connection_readonly/2, get_host_flag/1, get_host_flag/2,clean_invalid_provider/1]).

-include("dubbo.hrl").
-define(SERVER, ?MODULE).
Expand Down Expand Up @@ -193,11 +194,7 @@ start_consumer(Interface, ProviderNodeInfo) ->
get_host_connections(Host, Port) ->
HostFlag = get_host_flag(Host, Port),
List = ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag),
List2 = lists:map(
fun(#provider_node_list{host_flag = HostFlag,pid = Pid,readonly = Readonly}) ->
#connection_info{host_flag = HostFlag,pid = Pid,readonly = Readonly}
end, List),
List2.
List.


%%%===================================================================
Expand Down Expand Up @@ -239,14 +236,29 @@ get_host_connections(Host, Port) ->
%% end, ExecutesList),
%% ConnectionList.


update_node_conections(HostFlag,Connections)->
lists:map(
fun(Item) ->
HostFlag= Item#connection_info.host_flag,
case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#connection_info{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of
'$end_of_table' ->
I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item),
logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
_ ->
ok
end
end, Connections),
ok.

update_consumer_connections(Interface, Connections) ->
lists:map(
fun(Item) ->
HostFlag= Item#connection_info.host_flag,

case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#provider_node_list{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of
case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#connection_info{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of
'$end_of_table' ->
I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag,pid = Item#connection_info.pid}),
I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item),
logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
{_ObjectList,_Continuation} ->
ok
Expand All @@ -269,7 +281,7 @@ update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode)
logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]),
case IsUpdateProvideNode of
true ->
I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag, connection_info = Item}),
I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item),
logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]);
false ->
ok
Expand Down Expand Up @@ -320,18 +332,17 @@ clean_invalid_provider([HostFlag | DeleteProverList]) ->
case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of
[] ->
ok;
ProviderNodeList ->
ProviderNodeList1 = dubbo_lists_util:del_duplicate(ProviderNodeList),
clean_connection_info(ProviderNodeList1)
ProviderNodeConnections ->
ProviderNodeConnections1 = dubbo_lists_util:del_duplicate(ProviderNodeConnections),
clean_connection_info(ProviderNodeConnections1)
end,
clean_invalid_provider(DeleteProverList).

clean_connection_info(ProviderNodeList) ->
clean_connection_info(ProviderNodeConnections) ->
lists:map(fun(Item) ->
Pid = Item#provider_node_list.connection_info#connection_info.pid,
ConnectionId = Item#provider_node_list.connection_info#connection_info.connection_id,
Pid = Item#connection_info.pid,
Pattern = #interface_list{pid = Pid, _ = '_'},
ets:delete_object(?INTERFCE_LIST_TABLE, Pattern),
dubbo_transport_pool_sup:stop_children(ConnectionId)
end, ProviderNodeList),
dubbo_transport_pool_sup:stop_children(Pid)
end, ProviderNodeConnections),
ok.

0 comments on commit 216b5a7

Please sign in to comment.