diff --git a/apps/dgiot/src/storage/dgiot_cache_worker.erl b/apps/dgiot/src/storage/dgiot_cache_worker.erl index c63a056269..cdd8277ff8 100644 --- a/apps/dgiot/src/storage/dgiot_cache_worker.erl +++ b/apps/dgiot/src/storage/dgiot_cache_worker.erl @@ -27,7 +27,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(cachestate, {threshold, maxsize, cacheets, checkpid}). +-record(cachestate, {threshold, maxsize, cacheets, checkpid, skip, dskip}). %%%=================================================================== @@ -83,10 +83,14 @@ init(Opts) -> ValueEts = ets:new(?MODULE, [public, named_table, {write_concurrency, true}, {read_concurrency, true}]), Interval = dgiot:get_env(load_cache_classes_interval, 10), erlang:send_after(1000 * Interval, self(), load_cache_classes), + erlang:send_after(1000 * Interval, self(), load_cache_all_classes), {ok, #cachestate{maxsize = MaxSize, threshold = Threshold, cacheets = ValueEts, - checkpid = CheckPid}}. + checkpid = CheckPid, + skip = 0, + dskip = 0 + }}. handle_call({get, Key}, _From, #cachestate{cacheets = ValueEts} = State) -> Reply = get(ValueEts, Key), @@ -127,15 +131,26 @@ handle_info({'load_cache_classes_fin'}, State) -> io:format("~s ~p ~p ~n", [?FILE, ?LINE, load_cache_classes_fin]), {noreply, State}; -handle_info(load_cache_classes, State) -> - case dgiot_hook:run_hook({'dgiot', 'load_cache_classes'}, [self()]) of - {error, not_find} -> +handle_info(load_cache_all_classes, #cachestate{skip = Skip} = State) -> + case dgiot_hook:run_hook('parse_cache_Product', {Skip}) of + {ok, [{next, Next_Skip}]} -> Interval = dgiot:get_env(load_cache_classes_interval, 10), - erlang:send_after(1000 * Interval, self(), load_cache_classes); + erlang:send_after(1000 * Interval, self(), load_cache_all_classes), + {noreply, State#cachestate{skip = Next_Skip}}; _ -> - pass - end, - {noreply, State}; + dgiot_bridge_server ! {start_custom_channel}, + {noreply, State} + end; + +handle_info(load_cache_classes, #cachestate{dskip = DSkip} = State) -> + case dgiot_hook:run_hook('parse_cache_Device', {DSkip}) of + {ok, [{next, Next_DSkip}]} -> + Interval = dgiot:get_env(load_cache_classes_interval, 10), + erlang:send_after(1000 * Interval, self(), load_cache_classes), + {noreply, State#cachestate{dskip = Next_DSkip}}; + _ -> + {noreply, State} + end; handle_info(_Msg, State) -> {noreply, State}. diff --git a/apps/dgiot_api/src/data_worker.erl b/apps/dgiot_api/src/data_worker.erl index 30c90bc40f..77b007764d 100644 --- a/apps/dgiot_api/src/data_worker.erl +++ b/apps/dgiot_api/src/data_worker.erl @@ -97,7 +97,7 @@ handle_info(station, #task{sessiontoken = SessionToken, env = #{<<"type">> := << handle_info(station, #task{sessiontoken = SessionToken, env = #{<<"type">> := <<"import">>, <<"file">> := #{<<"fullpath">> := Fullpath} = _File}} = State) -> io:format("~s ~p State = ~p.~n", [?FILE, ?LINE, State]), import_parse(Fullpath), - dgiot_device_cache:parse_cache_Device(<<>>), + dgiot_device_cache:parse_cache_Device(0), restart_channel(SessionToken), import_td(SessionToken), import_files(), diff --git a/apps/dgiot_bridge/src/handler/dgiot_rule_handler.erl b/apps/dgiot_bridge/src/handler/dgiot_rule_handler.erl index f03162ec0d..6d99642eff 100644 --- a/apps/dgiot_bridge/src/handler/dgiot_rule_handler.erl +++ b/apps/dgiot_bridge/src/handler/dgiot_rule_handler.erl @@ -214,6 +214,16 @@ do_request(get_resource_types_ctype, #{<<"cType">> := CType} = _Args, _Context, [Params] = proplists:get_value(params, Attributes, [#{}]), Controls = maps:fold(fun + (Key, #{type := enum, required := Required, title := #{zh := Name}} = Param, Acc) -> + Acc ++ [ + #{ + <<"type">> => <<"select">>, + <<"label">> => Name, + <<"name">> => <<"profile.", Key/binary>>, + <<"required">> => Required, + <<"options">> => maps:get(enum, Param, []) + } + ]; (Key, #{default := Default, type := Type, required := Required, title := #{zh := Name}}, Acc) -> Acc ++ [ #{ @@ -222,7 +232,7 @@ do_request(get_resource_types_ctype, #{<<"cType">> := CType} = _Args, _Context, <<"name">> => <<"profile.", Key/binary>>, <<"required">> => Required, <<"placeholder">> => Default - } + } ]; (Key, _, Acc) -> Acc ++ [ diff --git a/apps/dgiot_device/src/dgiot_device.erl b/apps/dgiot_device/src/dgiot_device.erl index dac9d31ae0..f6c65f5691 100644 --- a/apps/dgiot_device/src/dgiot_device.erl +++ b/apps/dgiot_device/src/dgiot_device.erl @@ -27,8 +27,8 @@ -export([put_color/3, get_color/2, put_location/3, get_location/1, get_address/3, get_productid/1]). -export([get_acl/1, get_readonly_acl/1, save_log/3, get_url/1, get_appname/1]). -parse_cache_Device(_ClasseName) -> - dgiot_device_cache:parse_cache_Device(_ClasseName). +parse_cache_Device(Skip) -> + dgiot_device_cache:parse_cache_Device(Skip). sync_parse(OffLine) -> dgiot_device_cache:sync_parse(OffLine). diff --git a/apps/dgiot_device/src/dgiot_device_app.erl b/apps/dgiot_device/src/dgiot_device_app.erl index fb03d21978..2d45df095a 100644 --- a/apps/dgiot_device/src/dgiot_device_app.erl +++ b/apps/dgiot_device/src/dgiot_device_app.erl @@ -21,12 +21,14 @@ stop(_State) -> start_hook() -> dgiot_hook:add(one_for_one, 'parse_cache_Device', fun dgiot_device:parse_cache_Device/1), + dgiot_hook:add(one_for_one, 'parse_cache_Product', fun dgiot_product:load_all_cache/1), dgiot_hook:add(one_for_one, {'topo', <<"counter">>}, fun dgiot_device_static:get_counter/1), dgiot_hook:add(one_for_one, {'topo', <<"realdata">>}, fun dgiot_device_static:get_realdata/1), dgiot_hook:add(one_for_one, {'topo', <<"pie">>}, fun dgiot_device_static:get_pie/1). stop_hook() -> dgiot_hook:remove('parse_cache_Device'), + dgiot_hook:remove('parse_cache_Product'), dgiot_hook:remove({'topo', <<"counter">>}), dgiot_hook:remove({'topo', <<"realdata">>}), dgiot_hook:remove({'topo', <<"pie">>}). diff --git a/apps/dgiot_device/src/dgiot_product.erl b/apps/dgiot_device/src/dgiot_product.erl index bb69a4529d..712827b263 100644 --- a/apps/dgiot_device/src/dgiot_product.erl +++ b/apps/dgiot_device/src/dgiot_product.erl @@ -19,31 +19,33 @@ -include("dgiot_device.hrl"). -include_lib("dgiot/include/logger.hrl"). -dgiot_data("ets"). --export([init_ets/0, load_cache/0, local/1, save/1, put/1, get/1, delete/1, save_prod/2, lookup_prod/1]). +-export([init_ets/0, load_all_cache/1, local/1, save/1, put/1, get/1, delete/1, save_prod/2, lookup_prod/1]). -export([parse_frame/3, to_frame/2]). -export([create_product/1, create_product/2, add_product_relation/2, delete_product_relation/1]). -export([get_prop/1, get_props/1, get_props/2, get_unit/1, update_properties/2, update_properties/0]). -export([update_topics/0, update_product_filed/1]). -export([save_devicetype/1, get_devicetype/1, get_device_thing/2, get_productSecret/1]). --export([save_keys/1, get_keys/1, get_control/1, save_control/1, get_interval/1, save_device_thingtype/1, get_product_identifier/2]). +-export([save_/1, get_keys/1, get_control/1, save_control/1, get_interval/1, save_device_thingtype/1, get_product_identifier/2]). init_ets() -> - dgiot_data:init(?DGIOT_PRODUCT,[public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), - dgiot_data:init(?DGIOT_PRODUCT_IDENTIFIE,[public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), - dgiot_data:init(?DGIOT_CHANNEL_SESSION,[public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), - dgiot_data:init(?DEVICE_DEVICE_COLOR,[public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), - dgiot_data:init(?DEVICE_PROFILE,[public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]). - -load_cache() -> - Success = fun(Page) -> - lists:map(fun(Product) -> - dgiot_product:save(Product) - end, Page) - end, - Query = #{ - <<"where">> => #{} - }, - dgiot_parse_loader:start(<<"Product">>, Query, 0, 1000, 1000000, Success). + dgiot_data:init(?DGIOT_PRODUCT, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), + dgiot_data:init(?DGIOT_PRODUCT_IDENTIFIE, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), + dgiot_data:init(?DGIOT_CHANNEL_SESSION, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), + dgiot_data:init(?DEVICE_DEVICE_COLOR, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), + dgiot_data:init(?DEVICE_PROFILE, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]). + +load_all_cache({Skip}) -> + case dgiot_parsex:query_object(<<"Product">>, #{<<"limit">> => 1000, <<"skip">> => Skip}) of + {ok, #{<<"results">> := Results}} when length(Results) == 0 -> + load_end; + {ok, #{<<"results">> := Products}} -> + lists:map(fun(Product) -> + dgiot_product:save(Product) + end, Products), + {next, Skip + 1000}; + _ -> + {next, Skip} + end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% save_prod(ProductId, #{<<"thing">> := _thing} = Product) -> @@ -74,18 +76,12 @@ save(Product) -> #{<<"productId">> := ProductId} = Product1, dgiot_data:delete(?DGIOT_PRODUCT, ProductId), dgiot_data:insert(?DGIOT_PRODUCT, ProductId, Product1), - save_keys(ProductId), - save_control(ProductId), - save_devicetype(ProductId), + save_(ProductId), save_productSecret(ProductId), dgiot_product_channel:save_channel(ProductId), dgiot_product_channel:save_tdchannel(ProductId), dgiot_product_channel:save_taskchannel(ProductId), - save_device_thingtype(ProductId), - save_product_identifier(ProductId), - dgiot_product_enum:save_product_enum(ProductId), - timer:sleep(10000), - dgiot_bridge_server ! {start_custom_channel}, +%% dgiot_product_enum:save_product_enum(ProductId), {ok, Product1}. put(Product) -> @@ -175,6 +171,14 @@ get_devicetype(ProductId) -> %% 设备类型 +save_device_thingtype(ProductId, DeviceType, NewMap) -> + case dgiot_data:get(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}) of + not_find -> + dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}, NewMap); + Map -> + dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}, dgiot_map:merge(Map, NewMap)) + end. + save_device_thingtype(ProductId) -> case dgiot_product:lookup_prod(ProductId) of {ok, #{<<"thing">> := #{<<"properties">> := Props}}} -> @@ -196,20 +200,6 @@ save_device_thingtype(ProductId) -> end. %% 物模型标识符 -save_product_identifier(ProductId) -> - delete_product_identifier(ProductId), - case dgiot_product:lookup_prod(ProductId) of - {ok, #{<<"thing">> := #{<<"properties">> := Props} = Thing}} -> - Tags = maps:get(<<"tags">>, Thing, []), - lists:map( - fun(#{<<"identifier">> := Identifie} = Prop) -> - dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifie, identifie}, Prop) - end, Props ++ Tags); - - _Error -> - [] - end. - delete_product_identifier(ProductId) -> Fun = fun @@ -310,22 +300,35 @@ update_product_filed(_Filed) -> pass end. -save_keys(ProductId) -> - Keys = +save_(ProductId) -> + {Keys, Control, DeviceTypes} = case dgiot_product:lookup_prod(ProductId) of - {ok, #{<<"thing">> := #{<<"properties">> := Props}}} -> + {ok, #{<<"thing">> := #{<<"properties">> := Props} = Thing}} -> + delete_product_identifier(ProductId), + Tags = maps:get(<<"tags">>, Thing, []), lists:foldl( fun - (#{<<"identifier">> := Identifier, <<"isstorage">> := true}, Acc) -> - Acc ++ [Identifier]; - (_, Acc) -> - Acc - end, [], Props); - + (#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"isstorage">> := true, <<"profile">> := Profile, <<"dataType">> := #{<<"type">> := Type}} = Prop, {Acc, Ccc, Dcc}) -> + dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop), + save_device_thingtype(ProductId, DeviceType, #{Identifier => Type}), + {Acc ++ [Identifier], Ccc#{Identifier => Profile}, Dcc ++ [DeviceType]}; + (#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"profile">> := Profile, <<"dataType">> := #{<<"type">> := Type}} = Prop, {Acc, Ccc, Dcc}) -> + dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop), + save_device_thingtype(ProductId, DeviceType, #{Identifier => Type}), + {Acc, Ccc#{Identifier => Profile}, Dcc ++ [DeviceType]}; + (#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"isstorage">> := true, <<"dataType">> := #{<<"type">> := Type}} = Prop, {Acc, Ccc, Dcc}) -> + dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop), + save_device_thingtype(ProductId, DeviceType, #{Identifier => Type}), + {Acc ++ [Identifier], Ccc, Dcc ++ [DeviceType]}; + (_, {Acc, Ccc, Dcc}) -> + {Acc, Ccc, Dcc} + end, {[], #{}, []}, Props ++ Tags); _Error -> - [] + {[], #{}, []} end, - dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, keys}, Keys). + dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, keys}, Keys), + dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, profile_control}, Control), + dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, devicetype}, dgiot_utils:unique_2(DeviceTypes)). get_keys(ProductId) -> case dgiot_data:get(?DGIOT_PRODUCT, {ProductId, keys}) of diff --git a/apps/dgiot_device/src/utils/dgiot_device_cache.erl b/apps/dgiot_device/src/utils/dgiot_device_cache.erl index 4479560d20..6495c860d5 100644 --- a/apps/dgiot_device/src/utils/dgiot_device_cache.erl +++ b/apps/dgiot_device/src/utils/dgiot_device_cache.erl @@ -29,23 +29,19 @@ init_ets() -> dgiot_data:init(?DGIOT_LOCATION_ADDRESS, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]). %% Device 数量统计,权限统计,在线离线统计,产品下面设备数量统计等是用户非常关系的数据指标 -parse_cache_Device(_ClassName) -> -%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, ClassName]), - dgiot_product:load_cache(), - Success = fun(Page) -> - lists:map(fun(#{<<"devaddr">> := _Devaddr} = Device) -> -%% save_profile(Device), - timer:sleep(2), -%% io:format("Devaddr ~p ~n",[Devaddr]), - dgiot_device:save(Device) - end, Page) - end, - Query = #{ - <<"order">> => <<"updatedAt">>, - <<"keys">> => [<<"ACL">>, <<"updatedAt">>, <<"state">>, <<"devaddr">>, <<"status">>, <<"isEnable">>, <<"profile">>, <<"product">>, <<"location">>, <<"deviceSecret">>], - <<"where">> => #{} - }, - dgiot_parse_loader:start(<<"Device">>, Query, 0, 100, 1000000, Success). +parse_cache_Device({Skip}) -> + case dgiot_parsex:query_object(<<"Device">>, #{<<"limit">> => 1000, <<"skip">> => Skip, + <<"keys">> => [<<"ACL">>, <<"updatedAt">>, <<"state">>, <<"devaddr">>, <<"status">>, <<"isEnable">>, <<"profile">>, <<"product">>, <<"location">>, <<"deviceSecret">>]}) of + {ok, #{<<"results">> := Results}} when length(Results) == 0 -> + load_end; + {ok, #{<<"results">> := Page}} -> + lists:map(fun(Device) -> + dgiot_device:save(Device) + end, Page), + {next, Skip + 1000}; + _ -> + {next, Skip} + end. save(ProductId, DevAddr) -> DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),