diff --git a/tests/verify_basic_upgrade.erl b/tests/verify_basic_upgrade.erl index 330dc52f3..b61660bd9 100644 --- a/tests/verify_basic_upgrade.erl +++ b/tests/verify_basic_upgrade.erl @@ -22,52 +22,185 @@ -export([confirm/0]). -include_lib("eunit/include/eunit.hrl"). --define(NUM_KEYS, 20000). +-define(NUM_KEYS, 60000). --define(CONFIG(RingSize, NVal), - [ - {riak_core, +-define(CONFIG_CORE(RingSize, NVal), + {riak_core, + [ + {ring_creation_size, RingSize}, + {default_bucket_props, [ - {ring_creation_size, RingSize}, - {default_bucket_props, - [ - {n_val, NVal}, - {allow_mult, true}, - {dvv_enabled, true} - ]} + {n_val, NVal}, + {allow_mult, true}, + {dvv_enabled, true} + ]} + ] + }). + + +-define(CONFIG_PLAINTEXT(CoreConfig), +[ + CoreConfig, + {leveled, + [ + {journal_objectcount, 2000}, + {compression_point, on_compact} + ]}, + {eleveldb, + [ + {compression, false} ] - }, + } + ]). + +-define(CONFIG_NATIVE(CoreConfig), + [ + CoreConfig, {leveled, [ - {journal_objectcount, 2000} - % setting low object count ensures we test rolled journal - % files, not just active ones - ]} + {journal_objectcount, 2000}, + {compression_point, on_receipt}, + {compression_method, native} + ]}, + {eleveldb, + [ + {compression, snappy} + ] + } ]). +-define(CONFIG_LZ4(CoreConfig), + [ + CoreConfig, + {leveled, + [ + {journal_objectcount, 2000}, + {compression_point, on_receipt}, + {compression_method, lz4} + ]}, + {eleveldb, + [ + {compression, lz4} + ] + } + ]). + confirm() -> TestMetaData = riak_test_runner:metadata(), + KVBackend = proplists:get_value(backend, TestMetaData), OldVsn = proplists:get_value(upgrade_version, TestMetaData, previous), - [Nodes] = - rt:build_clusters([{4, OldVsn, ?CONFIG(8, 3)}]), + lager:info("*****************************"), + lager:info("Testing without compression"), + lager:info("*****************************"), + + [NodesPlainText] = + rt:build_clusters([{4, OldVsn, ?CONFIG_PLAINTEXT(?CONFIG_CORE(8,3))}]), + PlainTextSz = verify_basic_upgrade(NodesPlainText, OldVsn), + + case KVBackend of + bitcask -> + pass; + _ -> + rt:clean_cluster(NodesPlainText), + + lager:info("*****************************"), + lager:info("Testing with native compression"), + lager:info("*****************************"), + [NodesNative] = + rt:build_clusters( + [{4, OldVsn, ?CONFIG_NATIVE(?CONFIG_CORE(8,3))}]), + NativeSz = verify_basic_upgrade(NodesNative, OldVsn), + ?assert(NativeSz < floor((PlainTextSz * 0.8))), + + rt:clean_cluster(NodesNative), + + lager:info("*****************************"), + lager:info("Testing with lz4 compression"), + lager:info("*****************************"), + [NodesLZ4] = + rt:build_clusters( + [{4, OldVsn, ?CONFIG_LZ4(?CONFIG_CORE(8, 3))}]), + LZ4Sz = verify_basic_upgrade(NodesLZ4, OldVsn), + ?assert(LZ4Sz < floor((PlainTextSz * 0.8))), + pass + end. + +verify_basic_upgrade(Nodes, OldVsn) -> [Node1|_] = Nodes, + V = compressable_value(), lager:info("Writing ~w keys to ~p", [?NUM_KEYS, Node1]), - rt:systest_write(Node1, ?NUM_KEYS, 3), - ?assertEqual([], rt:systest_read(Node1, ?NUM_KEYS, 1)), + rt:systest_write(Node1, 1, ?NUM_KEYS, <<"B1">>, 2, V), + validate_value(Node1, <<"B1">>, 1, ?NUM_KEYS, V, 0.1), [upgrade(Node, current) || Node <- Nodes], + + validate_value(Node1, <<"B1">>, 1, ?NUM_KEYS, V, 1.0), + + lager:info("Writing ~w keys to ~p", [?NUM_KEYS div 4, Node1]), + rt:systest_write(Node1, 1, ?NUM_KEYS div 4, <<"B2">>, 2, V), %% Umm.. technically, it'd downgrade [upgrade(Node, OldVsn) || Node <- Nodes], - pass. + + validate_value(Node1, <<"B1">>, 1, ?NUM_KEYS, V, 0.1), + validate_value(Node1, <<"B2">>, 1, ?NUM_KEYS div 4, V, 1.0), + + backend_size(Node1). upgrade(Node, NewVsn) -> lager:info("Upgrading ~p to ~p", [Node, NewVsn]), rt:upgrade(Node, NewVsn), rt:wait_for_service(Node, riak_kv), - lager:info("Ensuring keys still exist"), - ?assertEqual([], rt:systest_read(Node, ?NUM_KEYS, 1)), ok. + +backend_size(Node) -> + TestMetaData = riak_test_runner:metadata(), + KVBackend = proplists:get_value(backend, TestMetaData), + {ok, DataDir} = + rpc:call(Node, application, get_env, [riak_core, platform_data_dir]), + BackendDir = filename:join(DataDir, base_dir_for_backend(KVBackend)), + SzTxt = rpc:call(Node, os, cmd, ["du -sh " ++ BackendDir]), + lager:info("Backend size ~s", [SzTxt]), + {match, [SzOnly]} = + re:run(SzTxt, "(?[0-9]+)M.*", [{capture, all_names, list}]), + list_to_integer(SzOnly). + +base_dir_for_backend(leveled) -> + "leveled"; +base_dir_for_backend(bitcask) -> + "bitcask"; +base_dir_for_backend(eleveldb) -> + "leveldb". + +compressable_value() -> + T = "Value which repeats value and then says value again", + list_to_binary( + lists:flatten(lists:map(fun(_I) -> T end, lists:seq(1, 10))) + ). + +validate_value(Node, Bucket, StartKey, EndKey, Value, CheckPerc) -> + KeyCount = (1 + EndKey - StartKey), + lager:info( + "Verifying ~w keys of ~w from Bucket ~p Node ~p", + [floor(CheckPerc * KeyCount), KeyCount, Bucket, Node]), + {ok, C} = riak:client_connect(Node), + lists:foreach( + fun(N) -> + case rand:uniform() of + R when R < CheckPerc -> + {ok, O} = riak_client:get(Bucket, <>, C), + ?assert(value_matches(riak_object:get_value(O), N, Value)); + _ -> + skip + end + end, + lists:seq(StartKey, EndKey) + ). + +value_matches(<>, N, CommonValBin) -> + true; +value_matches(_WrongVal, _N, _CommonValBin) -> + false. \ No newline at end of file