Skip to content

Commit

Permalink
feat: cache classes
Browse files Browse the repository at this point in the history
  • Loading branch information
dawnwinterLiu committed May 23, 2024
1 parent 4ff0633 commit 59892a0
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 81 deletions.
33 changes: 24 additions & 9 deletions apps/dgiot/src/storage/dgiot_cache_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
-export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]).

-record(cachestate, {threshold, maxsize, cacheets, checkpid}).
-record(cachestate, {threshold, maxsize, cacheets, checkpid, skip, dskip}).


%%%===================================================================
Expand Down Expand Up @@ -83,10 +83,14 @@ init(Opts) ->
ValueEts = ets:new(?MODULE, [public, named_table, {write_concurrency, true}, {read_concurrency, true}]),
Interval = dgiot:get_env(load_cache_classes_interval, 10),
erlang:send_after(1000 * Interval, self(), load_cache_classes),
erlang:send_after(1000 * Interval, self(), load_cache_all_classes),
{ok, #cachestate{maxsize = MaxSize,
threshold = Threshold,
cacheets = ValueEts,
checkpid = CheckPid}}.
checkpid = CheckPid,
skip = 0,
dskip = 0
}}.

handle_call({get, Key}, _From, #cachestate{cacheets = ValueEts} = State) ->
Reply = get(ValueEts, Key),
Expand Down Expand Up @@ -127,15 +131,26 @@ handle_info({'load_cache_classes_fin'}, State) ->
io:format("~s ~p ~p ~n", [?FILE, ?LINE, load_cache_classes_fin]),
{noreply, State};

handle_info(load_cache_classes, State) ->
case dgiot_hook:run_hook({'dgiot', 'load_cache_classes'}, [self()]) of
{error, not_find} ->
handle_info(load_cache_all_classes, #cachestate{skip = Skip} = State) ->
case dgiot_hook:run_hook('parse_cache_Product', {Skip}) of
{ok, [{next, Next_Skip}]} ->
Interval = dgiot:get_env(load_cache_classes_interval, 10),
erlang:send_after(1000 * Interval, self(), load_cache_classes);
erlang:send_after(1000 * Interval, self(), load_cache_all_classes),
{noreply, State#cachestate{skip = Next_Skip}};
_ ->
pass
end,
{noreply, State};
dgiot_bridge_server ! {start_custom_channel},
{noreply, State}
end;

handle_info(load_cache_classes, #cachestate{dskip = DSkip} = State) ->
case dgiot_hook:run_hook('parse_cache_Device', {DSkip}) of
{ok, [{next, Next_DSkip}]} ->
Interval = dgiot:get_env(load_cache_classes_interval, 10),
erlang:send_after(1000 * Interval, self(), load_cache_classes),
{noreply, State#cachestate{dskip = Next_DSkip}};
_ ->
{noreply, State}
end;

handle_info(_Msg, State) ->
{noreply, State}.
Expand Down
2 changes: 1 addition & 1 deletion apps/dgiot_api/src/data_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ handle_info(station, #task{sessiontoken = SessionToken, env = #{<<"type">> := <<
handle_info(station, #task{sessiontoken = SessionToken, env = #{<<"type">> := <<"import">>, <<"file">> := #{<<"fullpath">> := Fullpath} = _File}} = State) ->
io:format("~s ~p State = ~p.~n", [?FILE, ?LINE, State]),
import_parse(Fullpath),
dgiot_device_cache:parse_cache_Device(<<>>),
dgiot_device_cache:parse_cache_Device(0),
restart_channel(SessionToken),
import_td(SessionToken),
import_files(),
Expand Down
12 changes: 11 additions & 1 deletion apps/dgiot_bridge/src/handler/dgiot_rule_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ do_request(get_resource_types_ctype, #{<<"cType">> := CType} = _Args, _Context,
[Params] = proplists:get_value(params, Attributes, [#{}]),
Controls =
maps:fold(fun
(Key, #{type := enum, required := Required, title := #{zh := Name}} = Param, Acc) ->
Acc ++ [
#{
<<"type">> => <<"select">>,
<<"label">> => Name,
<<"name">> => <<"profile.", Key/binary>>,
<<"required">> => Required,
<<"options">> => maps:get(enum, Param, [])
}
];
(Key, #{default := Default, type := Type, required := Required, title := #{zh := Name}}, Acc) ->
Acc ++ [
#{
Expand All @@ -222,7 +232,7 @@ do_request(get_resource_types_ctype, #{<<"cType">> := CType} = _Args, _Context,
<<"name">> => <<"profile.", Key/binary>>,
<<"required">> => Required,
<<"placeholder">> => Default
}
}
];
(Key, _, Acc) ->
Acc ++ [
Expand Down
4 changes: 2 additions & 2 deletions apps/dgiot_device/src/dgiot_device.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
-export([put_color/3, get_color/2, put_location/3, get_location/1, get_address/3, get_productid/1]).
-export([get_acl/1, get_readonly_acl/1, save_log/3, get_url/1, get_appname/1]).

parse_cache_Device(_ClasseName) ->
dgiot_device_cache:parse_cache_Device(_ClasseName).
parse_cache_Device(Skip) ->
dgiot_device_cache:parse_cache_Device(Skip).

sync_parse(OffLine) ->
dgiot_device_cache:sync_parse(OffLine).
Expand Down
2 changes: 2 additions & 0 deletions apps/dgiot_device/src/dgiot_device_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ stop(_State) ->

start_hook() ->
dgiot_hook:add(one_for_one, 'parse_cache_Device', fun dgiot_device:parse_cache_Device/1),
dgiot_hook:add(one_for_one, 'parse_cache_Product', fun dgiot_product:load_all_cache/1),
dgiot_hook:add(one_for_one, {'topo', <<"counter">>}, fun dgiot_device_static:get_counter/1),
dgiot_hook:add(one_for_one, {'topo', <<"realdata">>}, fun dgiot_device_static:get_realdata/1),
dgiot_hook:add(one_for_one, {'topo', <<"pie">>}, fun dgiot_device_static:get_pie/1).

stop_hook() ->
dgiot_hook:remove('parse_cache_Device'),
dgiot_hook:remove('parse_cache_Product'),
dgiot_hook:remove({'topo', <<"counter">>}),
dgiot_hook:remove({'topo', <<"realdata">>}),
dgiot_hook:remove({'topo', <<"pie">>}).
105 changes: 54 additions & 51 deletions apps/dgiot_device/src/dgiot_product.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,33 @@
-include("dgiot_device.hrl").
-include_lib("dgiot/include/logger.hrl").
-dgiot_data("ets").
-export([init_ets/0, load_cache/0, local/1, save/1, put/1, get/1, delete/1, save_prod/2, lookup_prod/1]).
-export([init_ets/0, load_all_cache/1, local/1, save/1, put/1, get/1, delete/1, save_prod/2, lookup_prod/1]).
-export([parse_frame/3, to_frame/2]).
-export([create_product/1, create_product/2, add_product_relation/2, delete_product_relation/1]).
-export([get_prop/1, get_props/1, get_props/2, get_unit/1, update_properties/2, update_properties/0]).
-export([update_topics/0, update_product_filed/1]).
-export([save_devicetype/1, get_devicetype/1, get_device_thing/2, get_productSecret/1]).
-export([save_keys/1, get_keys/1, get_control/1, save_control/1, get_interval/1, save_device_thingtype/1, get_product_identifier/2]).
-export([save_/1, get_keys/1, get_control/1, save_control/1, get_interval/1, save_device_thingtype/1, get_product_identifier/2]).

init_ets() ->
dgiot_data:init(?DGIOT_PRODUCT,[public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
dgiot_data:init(?DGIOT_PRODUCT_IDENTIFIE,[public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
dgiot_data:init(?DGIOT_CHANNEL_SESSION,[public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
dgiot_data:init(?DEVICE_DEVICE_COLOR,[public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
dgiot_data:init(?DEVICE_PROFILE,[public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]).

load_cache() ->
Success = fun(Page) ->
lists:map(fun(Product) ->
dgiot_product:save(Product)
end, Page)
end,
Query = #{
<<"where">> => #{}
},
dgiot_parse_loader:start(<<"Product">>, Query, 0, 1000, 1000000, Success).
dgiot_data:init(?DGIOT_PRODUCT, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
dgiot_data:init(?DGIOT_PRODUCT_IDENTIFIE, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
dgiot_data:init(?DGIOT_CHANNEL_SESSION, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
dgiot_data:init(?DEVICE_DEVICE_COLOR, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
dgiot_data:init(?DEVICE_PROFILE, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]).

load_all_cache({Skip}) ->
case dgiot_parsex:query_object(<<"Product">>, #{<<"limit">> => 1000, <<"skip">> => Skip}) of
{ok, #{<<"results">> := Results}} when length(Results) == 0 ->
load_end;
{ok, #{<<"results">> := Products}} ->
lists:map(fun(Product) ->
dgiot_product:save(Product)
end, Products),
{next, Skip + 1000};
_ ->
{next, Skip}
end.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
save_prod(ProductId, #{<<"thing">> := _thing} = Product) ->
Expand Down Expand Up @@ -74,18 +76,12 @@ save(Product) ->
#{<<"productId">> := ProductId} = Product1,
dgiot_data:delete(?DGIOT_PRODUCT, ProductId),
dgiot_data:insert(?DGIOT_PRODUCT, ProductId, Product1),
save_keys(ProductId),
save_control(ProductId),
save_devicetype(ProductId),
save_(ProductId),
save_productSecret(ProductId),
dgiot_product_channel:save_channel(ProductId),
dgiot_product_channel:save_tdchannel(ProductId),
dgiot_product_channel:save_taskchannel(ProductId),
save_device_thingtype(ProductId),
save_product_identifier(ProductId),
dgiot_product_enum:save_product_enum(ProductId),
timer:sleep(10000),
dgiot_bridge_server ! {start_custom_channel},
%% dgiot_product_enum:save_product_enum(ProductId),
{ok, Product1}.

put(Product) ->
Expand Down Expand Up @@ -175,6 +171,14 @@ get_devicetype(ProductId) ->


%% 设备类型
save_device_thingtype(ProductId, DeviceType, NewMap) ->
case dgiot_data:get(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}) of
not_find ->
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}, NewMap);
Map ->
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}, dgiot_map:merge(Map, NewMap))
end.

save_device_thingtype(ProductId) ->
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
Expand All @@ -196,20 +200,6 @@ save_device_thingtype(ProductId) ->
end.

%% 物模型标识符
save_product_identifier(ProductId) ->
delete_product_identifier(ProductId),
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Props} = Thing}} ->
Tags = maps:get(<<"tags">>, Thing, []),
lists:map(
fun(#{<<"identifier">> := Identifie} = Prop) ->
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifie, identifie}, Prop)
end, Props ++ Tags);

_Error ->
[]
end.

delete_product_identifier(ProductId) ->
Fun =
fun
Expand Down Expand Up @@ -310,22 +300,35 @@ update_product_filed(_Filed) ->
pass
end.

save_keys(ProductId) ->
Keys =
save_(ProductId) ->
{Keys, Control, DeviceTypes} =
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
{ok, #{<<"thing">> := #{<<"properties">> := Props} = Thing}} ->
delete_product_identifier(ProductId),
Tags = maps:get(<<"tags">>, Thing, []),
lists:foldl(
fun
(#{<<"identifier">> := Identifier, <<"isstorage">> := true}, Acc) ->
Acc ++ [Identifier];
(_, Acc) ->
Acc
end, [], Props);

(#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"isstorage">> := true, <<"profile">> := Profile, <<"dataType">> := #{<<"type">> := Type}} = Prop, {Acc, Ccc, Dcc}) ->
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop),
save_device_thingtype(ProductId, DeviceType, #{Identifier => Type}),
{Acc ++ [Identifier], Ccc#{Identifier => Profile}, Dcc ++ [DeviceType]};
(#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"profile">> := Profile, <<"dataType">> := #{<<"type">> := Type}} = Prop, {Acc, Ccc, Dcc}) ->
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop),
save_device_thingtype(ProductId, DeviceType, #{Identifier => Type}),
{Acc, Ccc#{Identifier => Profile}, Dcc ++ [DeviceType]};
(#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"isstorage">> := true, <<"dataType">> := #{<<"type">> := Type}} = Prop, {Acc, Ccc, Dcc}) ->
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop),
save_device_thingtype(ProductId, DeviceType, #{Identifier => Type}),
{Acc ++ [Identifier], Ccc, Dcc ++ [DeviceType]};
(_, {Acc, Ccc, Dcc}) ->
{Acc, Ccc, Dcc}
end, {[], #{}, []}, Props ++ Tags);
_Error ->
[]
{[], #{}, []}
end,
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, keys}, Keys).
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, keys}, Keys),
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, profile_control}, Control),
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, devicetype}, dgiot_utils:unique_2(DeviceTypes)).

get_keys(ProductId) ->
case dgiot_data:get(?DGIOT_PRODUCT, {ProductId, keys}) of
Expand Down
30 changes: 13 additions & 17 deletions apps/dgiot_device/src/utils/dgiot_device_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,19 @@ init_ets() ->
dgiot_data:init(?DGIOT_LOCATION_ADDRESS, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]).

%% Device 数量统计,权限统计,在线离线统计,产品下面设备数量统计等是用户非常关系的数据指标
parse_cache_Device(_ClassName) ->
%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, ClassName]),
dgiot_product:load_cache(),
Success = fun(Page) ->
lists:map(fun(#{<<"devaddr">> := _Devaddr} = Device) ->
%% save_profile(Device),
timer:sleep(2),
%% io:format("Devaddr ~p ~n",[Devaddr]),
dgiot_device:save(Device)
end, Page)
end,
Query = #{
<<"order">> => <<"updatedAt">>,
<<"keys">> => [<<"ACL">>, <<"updatedAt">>, <<"state">>, <<"devaddr">>, <<"status">>, <<"isEnable">>, <<"profile">>, <<"product">>, <<"location">>, <<"deviceSecret">>],
<<"where">> => #{}
},
dgiot_parse_loader:start(<<"Device">>, Query, 0, 100, 1000000, Success).
parse_cache_Device({Skip}) ->
case dgiot_parsex:query_object(<<"Device">>, #{<<"limit">> => 1000, <<"skip">> => Skip,
<<"keys">> => [<<"ACL">>, <<"updatedAt">>, <<"state">>, <<"devaddr">>, <<"status">>, <<"isEnable">>, <<"profile">>, <<"product">>, <<"location">>, <<"deviceSecret">>]}) of
{ok, #{<<"results">> := Results}} when length(Results) == 0 ->
load_end;
{ok, #{<<"results">> := Page}} ->
lists:map(fun(Device) ->
dgiot_device:save(Device)
end, Page),
{next, Skip + 1000};
_ ->
{next, Skip}
end.

save(ProductId, DevAddr) ->
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
Expand Down

0 comments on commit 59892a0

Please sign in to comment.