diff --git a/IOQ2.md b/IOQ2.md new file mode 100644 index 0000000..ddf8d22 --- /dev/null +++ b/IOQ2.md @@ -0,0 +1,812 @@ +# IOQ2 Overview + +IOQ2 is a replacement for the original IOQ, with the core motivation to create +a faster IOQ that eliminates the need for IOQ bypasses. This is achieved with +two primary approaches: + + 1. Faster data structures + 2. More processes + +IOQ2 also provides a more capable configuration system allowing +(de)prioritization at the class, user, and shard/class levels. This means you +can do things like bumping up global compaction priority, deprioritizing a +problematic MT user, and bumping up view build priority on an individual shard. + + +## Faster Data Structures + +One of the issues with IOQ1 is that it uses standard Erlang list and queue data +structures for doing high volume modifications to the request queues and +tracking of active requests. This quickly becomes a bottleneck, and results in +IOQ1 being roughly an order of magnitude slower than bypassing it, in high +throughput scenarios. + +We work around this issue in IOQ2 by introducing the `hqueue` data structure, +which is an Erlang NIF wrapper around a simple mutable priority queue that only +does in place updates. This queue is minimal in functionality and only supports +floating point values as the priority sorting mechanism. The result is that +IOQ2 focuses on a multiplicative prioritization scheme using chained +multipliers allowing for different weights to classes, users, and shard/class +pairs. This prioritization scheme can easily be extended later on to include +additional attributes, such as CCM tier, or perhaps a feedback loop providing +prioritization on overall request volumes/cost per user. + + +## More Processes + +The introduction of `hqueue` has a considerable impact on IOQ and more than +doubles the throughput of IOQ1, however, as mentioned above, IOQ1 bypasses can result in +an order of magnitude performance difference, so clearly faster data structures +are not sufficient to solve the problem. The main issue is that an Erlang +process can only go as fast a single CPU core will allow, so as we continually +add more CPU cores to production systems, single process IOQ becomes even more +of a problem. + +Having all requests funnel through a single Erlang process will +inevitably become a bottleneck. That said, the entire point of IOQ is that it +*IS* a bottleneck! If there's no queued requests, then there's nothing to +prioritize, making the use of IOQ questionable at best. The balancing act here +is getting as close to IOQ bypass performance as we can while inducing enough +of a bottleneck to effectively be able to prioritize different types of work. + +IOQ2 uses a set of IOQ2 processes to achieve similar levels of performance as +an IOQ bypass. It creates a set of named `ioq_server_$N` processes for each +Erlang scheduler in the VM. The caller requests are then dispatched to the +appropriate IOQ2 server based on the current scheduler of the caller. Overall +this works quite well and seems to be an effective way of ensuring sufficient +request volume to have a queue backlog to prioritize, while also spreading out +load as the Erlang spreads load out across more schedulers, as needed. There is +an experimental option to bind the IOQ2 pids to the relevant schedulers, but so +far this has not shown conclusive improvements during limited testing. Another +potential approach here is to randomize requests to the different IOQ2 pids. +More fine grained testing with a sharp eye on latency distributions would be +useful here. + + +# The (h)queue + +The new queue in IOQ2 is the `hqueue` NIF, which is a mutable heap based +max priority queue that prioritizes on a single floating point value. What this +means in practice is that every request gets a numeric floating point priority +value, and is then thrown into the queue until is is popped as the max value. +The result is a priority queue data structure that inserts new items in +O(log(N)) and also extracts the maximum item in O(log(N)), resulting in a very +performant data structure for the application at hand. + + +## Sample Prioritizations + +The current prioritization scheme is a simple set of multipliers for the +various dimensions. Currently there are three dimensions: + + * Class: eg `interactive`, `db_compact`, `view_update`, etc + * User: the user making the request, eg `<<"foo">>` + * Shard/Class pair: the shard and class for the request + - eg `{<<"shards/00000000-1fffffff/foo">>, interactive}` + - this allows for things like increased compaction priority on an + individual shard outside of the global class multipliers + +Behind the scenes, this basically works as follows: + +```erlang +prioritize_request(Req) -> + UserPriority = user_priority(Req), + ClassPriority = class_priority(Req), + ShardClassPriority = shard_class_priority(Req), + + UserPriority * ClassPriority * ShardClassPriority. +``` + +With the default priority being the identity priority, 1.0, so in the case +where no values are defined the multiplier above would be 1.0 * 1.0 * 1.0. The +default class priorities are currently defined as follows in `ioq.hrl`: + + +### Default Class Priorities + +```erlang +-define(DEFAULT_CLASS_PRIORITIES, [ + {customer, 1.0}, + {internal_repl, 0.001}, + {view_compact, 0.0001}, + {db_compact, 0.0001}, + {low, 0.0001}, + {db_meta, 1.0}, + + {db_update, 1.0}, + {view_update, 1.0}, + {other, 1.0}, + {interactive, 1.0} +]). +``` + + +## Handling Priority Starvation + +One potential problem with the simple floating point based priority queue is +that lower priority items can become starved given a constant volume of higher +priority requests. We want to ensure that requests can't get permanently +starved in this manner, and that work progresses on all fronts in a timely +fashion. IOQ1 handles this issue by ensuring there's always a random chance low +priority work will be executed. + +In IOQ2 this issue is handled by way of an auto scaling elevator on the +priority values. What this means is that every `N` requests, IOQ2 will scale +the existing queued items by a configurable scaling factor. The idea is that +you automatically increase the priority of all queued items, and if you do that +enough times then lower priority items will eventually bubble up to the top. +Behind the scenes hqueue is an array based heap so we can easily run through +the array and update the priority of each item. By scaling the priority of each +item linearly, we preserve the loop invariant sorted order of the elements in +the heap and can accomplish this without needing to resort the heap. + +The default scale factor is currently `2.0`, and the default `N` value for how +often to scale is currently every `1000` requests. Both of these values are +config knobs. In general these values seem _ok_, but they're not particularly +scientific, so we'll want to keep an eye on them over time in a variety of +workloads. + + +## Intentional Priority Starvation + +Initially, IOQ2 and hqueue required all priorities to be greater than zero, but +this has been switched to be greater than or equal to zero. The motivation here +is that 0.0 has the cute property of propagating through any multipliers. This +means a zero value for any of the dimensions will make the other dimensions +irrelevant. But what's even more interesting is that zero priority values skip +the auto scaling elevator and will forever be stuck at zero, which provides a +way to intentionally starve particular work types, or at the very least to +ensure that it will never be selected unless there is no other work to do. This +is especially useful for black balling problematic MT users, or marking a +particular database as background only work. + + +--- + + +# IOQ2 Operations Guide + +IOQ2 comes with a feature toggle, and is disabled by default. You can enable it +with: + + +## Enable IOQ2 + +```erlang +ioq_config:set_enabled(true, "Enabling IOQ2"). +``` + +You can verify it's enabled by checking: + +```erlang +ioq:ioq2_enabled(). +``` + + +## Metrics Dashboard + +IOQ2 has a dedicated dashboard page on `metrics.cloudant.com` with the +expected tab name of `IOQ2`. + + +--- + + +## Choosing Good Priority Values + +Choosing good priority values is going to be a crucial part of tuning IOQ2. +Unfortunately this is not particularly obvious nor necessarily easy, and it will +take some experimentation under different workloads to begin establishing some +best practices. Hopefully folks can start updating this section with some +useful tips and tricks for different workloads. Below there's more +documentation on how to validate the priority configs for different request +types. The primary motivation for adding that logic was to help facilitate +experimentation of different configuration options, and to aid in understanding +how the different configurations impact request prioritization. + +It will be useful to keep in mind the ranges of priority values. By default all +interactive/view_update/db_update/other class requests have a priority of +`1.0`, and assuming no user specific or shard specific configs, those requests +will have a priority of `1.0`. Similarly, standard background tasks like +compaction and internal replication have a default priority of `0.0001`. So +primary database operations by default have a thousand fold prioritization over +background tasks. The default bounds of prioritization are from `0.0` to +`10000.0`, so you have a decent field to experiment with, and the upper bound +can be configured higher or lower as desired. + +It's also important to remember the auto scaling elevator logic for prioritized +requests. Every `N` requests all currently queued requests have their +priorities linearly scaled, so after sufficiently long time in the queue, all +requests (with non `0.0` priorities) will eventually become the top priority +(assuming constant priorities coming in). The scaling factor can be configured +as well as how often to do the scaling. + +So let's look at some real world scenarios where you would want to change +prioritization. Let's start with a simple one, what to do when a node is at 95% +disk space? This is an easy one! just blast compaction priority. Unlike IOQ1, +IOQ2 does not differeniate between request types in terms of selecting next +work, it's strictly based on the priority multiplier, so you can completely +prioritize compaction traffic over standard database operations, potentially to +the detriment of standard operations performance. So if you set the compaction +priority multiplier to 10000 you'll prioritize compaction work above everything +else (assuming default priorities elsewhere). This means that as long as there +is compaction jobs in the queue those will be handled before anything else. +This should be a significant win for prioritizing compaction in low disk space +scenarios. + +Now, let's look at a similar, albeit more complicated scenario. Disk space is +above 85%, and you want to get out ahead of the compaction curve without +severely impacting cluster performance for standard operations. Cranking +compaction to the limit will get the job done, but it will also potentially +induce performance issues for the customer and could starve normal requests. +Here you would want to experiment a bit with gradually bumping up the +compaction priority. Try going from the `0.0001` default to `0.001` and see how +much that increases compaction throughput. Still not enough? try `0.01` and +repeat. Then on to `0.1` and maybe even on to `1.0` to make compaction priority +level with normal operations. + +One other thing to keep in mind here is that these prioritizations are also +dependent on the overall request volumes. If you've got 10k pending db_update +requests, and only 5 pending compaction requests, then cranking compaction +priority is not going to have a massive negative impact on db_update +throughput. Similarly, if you've only got one compaction job running, you'll +run into diminishing returns for how effectively you can prioritize compaction +as there's insufficient request volume to prioritize. You'll need to experiment +with increasing Smoosh concurrency to get more jobs running to have more queued +items to prioritize. + + +## Setting Priority Values + +IOQ2 contains utility functions for setting all configuration related values; +you should *not* need to use `config:set` for any IOQ2 related configuration +changes. In addition to the various config tunables, there are builtin helpers +to assist with setting appropriate priorities. All priority values should be +floating point values, and the described config helpers will prevent you from +using non floating point values. All of the config helpers here expect a +`Reason` value to log changes as per standard auditing rules. + +*NOTE* the shard priorities do not include the shard suffix in the config names +to preserve priorities between cycled dbs, so if for whatever reason you +manually set shard priorities, make sure you use `filename:rootname(ShardName)` +to drop the suffix so that your config options work as expected. + + +### Setting Class Priority Values + +Class specific priority multipliers can be set as demonstrated below. The class +name should be an Erlang atom, and the value should be a float. For example: + +```erlang +ioq_config:set_class_config(interactive, 3.2, "Insightful motivation"). +ioq_config:set_class_config(db_compact, 0.5, "Moar compactions"). +``` + + +### Setting Shard/Class Specific Priority Values + +You can prioritize on Shard/Class pairs, there is no shard wide prioritization +so you'll need to set each class as appropriate. This function takes a +`#shard{}` record as returned by `mem3:shards`. + +```erlang +Shard = hd(mem3:shards(DbName)), +ioq_config:set_shard_config(Shard, db_update, 2.3, "Prioritize db updates"). +``` + +You _could_ call `set_shard_config` for every shard for a given database, but +there's a helper for that as well: + + +### Setting Shard/Class Priority for all Shards in a Database + +There's a helper function for setting a class priority on all shards for a +given database name. Similarly to the shard/class configs, you have to specify +each class priority individually. You can use it as follows: + +```erlang +ioq_config:set_db_config(<<"foo/bar">>, view_update, 0.8, "Build that view"). +``` + +This is roughly equivalent to: + +```erlang +[set_shard_config(S, Class, Value, Reason) || S <- mem3:shards(DbName)]. +``` + + +### Setting User Specific Priority Values + +You can set a global multiplier for a particular user to increase or decrease +the priority of all of their requests. Here's an example: + +```erlang +ioq_config:set_user_config(<<"foo">>, 3.7, "Priority user"). +``` + +The `set_user_config` currently does not validate that the user exists, so +you'll want to validate you set the config for the appropriate user. + + +### Verifying Expected Prioritization Matches up with Reality + +Ok great, so you've just used the handy helpers to set varius priority values, +but how do you verify it did what you expect? How do you test to see that the +multipliers result in a prioritization in the desired range? The prioritization +logic is self contained and can easily be experimented with. You have two +options, either with the `check_priority/3` function, or by using the +`prioritize` function directly. The `check_priority` function is the simple +approach, and precludes you from having to build up the relevant priority data +structures. It can be used as follows: + +```erlang +User = <<"foo">>, +DbName = <<"foo/bar">>, +Shard = hd(mem3:shards(DbName)), +ioq_config:check_priority(internal_repl, User, Shard). +``` + +That will return the floating point prioritization value for that request. You +can also experiment with your own config options directly by way of using the +`ioq_config:prioritize` function. To demonstrate an example of using this, +here is the source code for the `check_priority` function used above: + +```erlang +-spec check_priority(atom(), binary(), binary()) -> float(). +check_priority(Class, User, Shard0) -> + {ok, ClassP} = build_class_priorities(), + {ok, UserP} = build_user_priorities(), + {ok, ShardP} = build_shard_priorities(), + + Shard = filename:rootname(Shard0), + Req = #ioq_request{ + user = User, + shard = Shard, + class = Class + }, + + prioritize(Req, ClassP, UserP, ShardP). +``` + +The `build_*_priorities()` functions are all exported from the `ioq_config` +module and are directly usable for easy testing. You can also see the full list +of priority values from those priority data structures like so: + +```erlang +(node1@127.0.0.1)14> khash:to_list(ShardP). +[{{<<"shards/00000000-1fffffff/foo">>,interactive},1.0e3}, + {{<<"shards/00000000-1fffffff/foo/pizza_db">>,db_update}, + 1.5}] +``` + + +--- + + +## Other Configuration Knobs + +There are a number of other configuration knobs availabe and they're detailed +below. + + +### IOQ2 Concurrency + +We'll start out with one of the more awkward configuration tunables: +concurrency! This option is awkward because it fundamentally changes the +dynamics of IOQ, both IOQ1 and IOQ2. If concurrency is higher than the number +of parallel requests, then you'll never actually prioritize things and using +IOQ is a waste. If it's too low then you'll overly bottleneck the system and +cause a backup of requests. + +This awkwardness is further compounded by the fact that IOQ2 is inherently +concurrent in that it has one IOQ2 pid per scheduler, so you must exercise +caution with setting the concurrency value! This value is propagated to every +IOQ2 pid, so setting concurrency is essentially multiplicative, and total +concurrency will be `concurrency * num_schedulers`. Most of our newer systems +now have 48 cores, and we have have systems with 56 cores now, so setting IOQ2 +concurrency to 5 could result in a total concurrency of 250+!!! The +`ioq:get_disk_concurrency()` function (which calls +`ioq_server2:get_concurrency()` when IOQ2 is enabled) will aggregate these +concurrency values together, giving you the full total, so that's useful to +double check. + +Interestingly enough, the best concurrency value found so far through emperical +means is concurrency=1 per IOQ2 pid! This is the current default, and so on +machines with 48 cores we end up with a total concurrency of 49. So far the +default of one has been fairly effective, but given sufficient volume of +requests it might be worthwhile to bump it up. Start small and trying bumping +it up to two, or maybe three. For example: + +```erlang +ioq_config:set_concurrency(2, "Bumping concurrency"). +``` + +*NOTE* if you do feel the need to update concurrency, please do notify +@chewbranca afterwards so we can observe this in more workloads. + + +### IOQ2 Resize Limit + +The resize limit value controls how many requests to handle before triggering +the auto scaling elevator logic described above. This defaults to 1000, and can +be changed with: + +```erlang +ioq_config:set_resize_limit(5000, "Huge resize limit test"). +``` + +*NOTE* if you do feel the need to update resize_limit, please do notify +@chewbranca afterwards so we can observe this in more workloads. + + +### IOQ2 Scale Factor + +The scale factor defines the multiplier to use during auto scaling when the +resize limit is hit. This currently defaults to two, which means every ten +auto scale iterations you'll have increased the priority one thousand fold for +any requests that have been in the queue for all ten cycles. This value may or +may not be too aggressive. Setting this to one essentially eliminates the auto +scaling elevator logic entirely, which is not really recommended. You can +update it as follows: + +```erlang +ioq_config:set_scale_factor(1.7, "Modifying scale factor"). +``` + +*NOTE* if you do feel the need to update scale_factor, please do notify +@chewbranca afterwards so we can observe this in more workloads. + + +### IOQ2 Max Priority + +Max priority establishes an upper bound on the priority values. It currently +defaults to 10000.0. There is also an implicit lower bound on priority values +of 0.0. Depending on how wild you go with the multipliers, it might be useful +to increase this value, which can be done with the following: + +```erlang +ioq_config:set_max_priority(55555.0, "Expand priority space"). +``` + +### IOQ2 Dedupe + +Both IOQ1 and IOQ2 have a dedupe feature that will avoid performing the same +read multiple times in parallel. In IOQ1 this operation scanned through lists +and could become a considerable resource hog. In IOQ2 this is a simple khash +lookup and should not be a problem. You should *not* need to ever disable this. +For whatever reason if you need to, you can do so with: + +```erlang +ioq_config:set_dedupe(false, "Disable dedupe test"). +``` + +*NOTE* if you do feel the need to update dedupe, please do notify +@chewbranca afterwards so we can observe this in more workloads. + + +### IOQ2 Bypass + +IOQ2 has the same bypass logic as IOQ1, however, the whole point of IOQ2 is to +make a sufficiently performant IOQ that bypasses are *not* necessary. This +functionality was included in IOQ2 as a backup in case max throughput is +essential and unreachable with IOQ2. You can set it in the standard manner, but +in the `ioq2.bypass` namespace as follows: + +```erlang +ioq_config:set_bypass(interactive, true, "Bypass interactive channel"). +``` + +*NOTE* if you do feel the need to bypass IOQ2, please do notify +@chewbranca afterwards so we can observe this in more workloads. Yes, this +NOTE blurb is in a number of config descriptions, but _please_ do notify +@chewbranca if you feel the need to bypass anything. + + +### IOQ2 Dispatch Strategy + +IOQ2 utilizes many processes to achieve the desired throughput and performance. +There are several different dispatch strategies for determining how requests are +funneled through these IOQ pids, and a `single_server` fallback in the event +only a single IOQ2 server is desired. Changing dispatch strategies is a *safe* +operation to perform, all the pids already exist and it will just toggle which +to go through. All active requests will continue to go through the IOQ2 pid they +were initially handled by, and any new requests will go through the specified +IOQ2. The four current dispatch strategies are: + + * "server_per_scheduler" + * "fd_hash" + * "random" + * "single_server" + + +### Server per Scheduler Dispatch Strategy + +```erlang +ioq_config:set_dispatch_strategy("server_per_scheduler", "Changing dispatch)). +``` + +This is the default dispatch strategy. IOQ2 creates `N` `ioq_server2` pids, where +`N` is the number of Erlang VM schedulers on the current system, which defaults +to the number of CPU Cores. This dispatch strategy uses the the current +scheduler of the caller process to determine which IOQ2 server to use. This has +the nice property of automatically distributing work out across IOQ2 servers +based on how the Erlang VM is spreading out work across the schedulers. In +practice this works pretty well and seems reasonable at a high level, but it may +or may not be optimal for all workloads, which is why we have multiple dispatch +strategies. + + +### FD Hash Scheduler Dispatch Strategy + +```erlang +ioq_config:set_dispatch_strategy("fd_hash", "Changing dispatch)). +``` + +The `fd_hash` dispatch strategy hashes on the couch_file pid the request has as +a destination, and then ensures that all requests to the same couch_file pid go +through a single IOQ2 pid. This provides the most control over prioritization of +requests to individual shards, as _all_ requests to that shard will go through +the single IOQ2 pid, providing global prioritization rather than localized by +IOQ2 pid. This can be useful when dealing with overloaded couch_file pids where +you want to minimize and focus work send to those pids. Also, by funneling all +reuqests to the same shard through the same IOQ2 pid, this increases the +opportunity for deduping requests, which can be significant. This dispatch +strategy can result in uneven distribution of work across IOQ2 pids, so it's not +appropriate for all situations, but for many dedicated clusters this could be an +ideal dispatch strategy. + + +### Random Dispatch Strategy + + +```erlang +ioq_config:set_dispatch_strategy("random", "Changing dispatch)). +``` + +The `random` dispatch strategy just randomly selects one of the IOQ2 pids to +send the request to. This dispatch strategy uses a random normal distribution +and should result in roughly even work distributed across all IOQ2 pids. This is +not the default strategy because if there's less concurrent requests active in +the system than total IOQ2 pids, there will not actually be any prioritization +taking place, in which case the `server_per_scheduler` dispatch strategy should +be preferred as it will reduce the number of IOQ2 pids in use as a function of +how much work is on the system. + + +### Single Server Dispatch Strategy + +```erlang +ioq_config:set_dispatch_strategy("random", "Changing dispatch)). +``` + +This is a fallback dispatch strategy that may or may not be removed at some +point. This utilizes a single IOQ2 pid for _all_ requests, eliminating the +benefits of parallel IOQ2 pids and inevitably resulting in IOQ2 becoming a +bottleneck in the same way as IOQ1, albeit a faster bottleneck. + + +--- + + +## Finding the pids + +The IOQ2 pids per scheduler have registered names of the form `ioq_server_$N` +where `$N` is the scheduler id, starting from 1. You can get a list of all the +IOQ2 pids on the current system with the following: + +```erlang +(node1@127.0.0.1)10> ioq_sup:ioq_server_pids(). +[ioq_server_1,ioq_server_2] +``` + + +## ioq_config:ioq_classes + +You can see the proper names for all registered IOQ classes with the following: + +```erlang +(node1@127.0.0.1)15> ioq_config:ioq_classes(). +[customer,internal_repl,view_compact,db_compact,low,db_meta, + db_update,view_update,other,interactive] +``` + + +## ioq_server2:get_state + +You can see a human readable representation of the IOQ2 server state with the +following block of code. The output is "human readable" in that the khash and +hqueue data structures have been transformed into lists so the contents can be +viewed. This fetches the state of the `ioq_server_1` pid. If you want a +different pid you'll need to manually `gen_server:call` into it. + +```erlang +(node1@127.0.0.1)16> ioq_server2:get_state(). +{state,[],[],[],1,0, + [{view_update,1.0}, + {view_compact,0.0001}, + {db_compact,0.0001}, + {low,0.0001}, + {db_update,1.0}, + {customer,1.0}, + {internal_repl,0.001}, + {interactive,1.0}, + {other,1.0}, + {db_meta,1.0}], + [], + [{{<<"shards/00000000-1fffffff/foo">>,interactive},1.0e3}, + {{<<"shards/00000000-1fffffff/foo/pizza_db">>,db_update}, + 1.5}], + 2.0,true,1000,1,ioq_server_1,0,normal,1.0e4} +``` + +If you want to have the pretty printed version and be able to fetch the fields +directly, you'll need to include the `ioq_server2` records, for example: + +```erlang +(node1@127.0.0.1)17> rr(ioq_server2), ioq_server2:get_state(). +#state{reqs = [],waiters = [],queue = [],concurrency = 1, + iterations = 0, + class_p = [{view_update,1.0}, + {view_compact,0.0001}, + {db_compact,0.0001}, + {low,0.0001}, + {db_update,1.0}, + {customer,1.0}, + {internal_repl,0.001}, + {interactive,1.0}, + {other,1.0}, + {db_meta,1.0}], + user_p = [], + shard_p = [{{<<"shards/00000000-1fffffff/foo">>,interactive}, + 1.0e3}, + {{<<"shards/00000000-1fffffff/foo/pizza_db">>,db_update}, + 1.5}], + scale_factor = 2.0,dedupe = true,resize_limit = 1000, + next_key = 1,server_name = ioq_server_1,scheduler_id = 1, + collect_stats = normal,max_priority = 1.0e4} +``` + +To fetch the server state from a particular IOQ2 pid, you can do so with the +following: + +```erlang +(node1@127.0.0.1)18> gen_server:call(ioq_server_2, get_state). +#state{reqs = [],waiters = [],queue = [],concurrency = 1, + iterations = 0, + class_p = [{view_update,1.0}, + {view_compact,0.0001}, + {db_compact,0.0001}, + {low,0.0001}, + {db_update,1.0}, + {customer,1.0}, + {internal_repl,0.001}, + {interactive,1.0}, + {other,1.0}, + {db_meta,1.0}], + user_p = [], + shard_p = [{{<<"shards/00000000-1fffffff/foo">>,interactive}, + 1.0e3}, + {{<<"shards/00000000-1fffffff/foo/pizza_db">>,db_update}, + 1.5}], + scale_factor = 2.0,dedupe = true,resize_limit = 1000, + next_key = 1,server_name = ioq_server_2,scheduler_id = 2, + collect_stats = normal,max_priority = 1.0e4} +``` + + +--- + + +## Gotchas + +Some miscellaneous "gotchas" to be aware of. + + +### Shard Class Configs Drop Suffixes + +By default the shard names include the db file suffix which is the timestamp of +creation time. These suffixes must be dropped from the config entries, +otherwise they will not be picked up in the IOQ2 config. For instance, here's +what the default name looks like, followed by properly truncating the suffix: + +```erlang +(node1@127.0.0.1)21> S#shard.name. +<<"shards/00000000-1fffffff/foo.1503945430">> +(node1@127.0.0.1)22> filename:rootname(S#shard.name). +<<"shards/00000000-1fffffff/foo">> +``` + + +### Shard Configs Persist Through Db Cycles, but not Reshards + +In the `Shard Class Configs Drop Suffixes` "gotcha" above, you'll see that +suffixes are not an allowed part of the shard config keys. The motivation here +is to allow for database configs persisting through cycles, eg deleting and +recreating a database will preserve the config. + +*HOWEVER*, if you delete the database and recreate it with a new sharding +factor, you'll end up with a completely different set of shards and the old +configs will no longer map over and it will essentially reset to the defaults. +This should be obvious given that the shard configs are keyed on the full shard +name, so switching said shard name will result in a different config key. +You'll need to manually reset the configs with the appropriate new shards once +the database has been recreated or resharded. + + +### Non integer/float Priority Configs are Ignored + +If you set a config value that is not an integer or floating point value, that +configuration will be silently ignored and replaced with the default value of +`1.0`. Check out the docs above about how to verify config expectations. The +`ioq_config` helpers described above will only allow you to set configs with +floating point values, so if you only use those this should never be a problem. +However if you manually set the config values you might run into this. The +IOQ2 config will attempt to convert integers to floats, but any other values +will be ignored. + + +### Only a subset of metrics reducers are enabled + +The https://metrics.cloudant.com tabs are predominantly powerd by Centinela +reducers that aggregate the node specific metrics into an individual global +metric. Due to the current precarious state of the metrics stack, the IOQ2 work +has been cautious with the introduction of new metrics. Only the reduced +metrics currently enabled on the IOQ2 tab have reducers enabled. If you need +additional reducers you'll need to add them. For example, the iowait metrics +are reduced on median but not P99.9: + +``` +(chewbranca)-(jobs:0)-(~) +(! 14529)-> acurl +https://cloudant.cloudant.com/metrics/couchdb.io_queue2.iowait.percentile.50 +{"_id":"couchdb.io_queue2.iowait.percentile.50","_rev":"1-e0ad9472a61b9a46ace4c9852ce63a36","reduce":true,"reducers":["mean"]} + +(chewbranca)-(jobs:0)-(~) +(! 14530)-> acurl +https://cloudant.cloudant.com/metrics/couchdb.io_queue2.iowait.percentile.999 +{"error":"not_found","reason":"missing"} +``` + + +### IOQ2 Concurrency is Multiplicative + +This is covered in depth in the concurrency sections above, but this is an +important enough point to warrant calling it out here as well. With IOQ1 +concurrency is singular for the one IOQ1 pid, but in IOQ2 concurrency is set +for *every* IOQ2 pid. This means you should realistically never set concurrency +above single digits for IOQ2. Setting concurrency to five on a system with 56 +CPU cores will result in a total concurrency of over 250, which is probably not +productive. + + +### IOQ2 Can Only Prioritize Work when there's a Backlog of Work + +As mentioned above, IOQ is inherently a bottleneck, otherwise it isn't actually +able to prioritize any work. On a similar note, if there's an insufficient +volume of a particular request type, you won't be able to significantly +influence the throughput of that type. For instance, you can bump compaction +priority to the moon but if there's only one compaction job you're not going to +make a significant difference to the volume of compaction requests. + +The other side of this is that IOQ2 is only effective at prioritizing work when +there's a variety of work types. If you've got a problematic MT user that you +want to back burner, you can set the user priority to `0.0` and all their +requests will be prioritized at `0.0` and will never benefit from the auto +scaling elevator. That said, if there are _no_ other users making requests to +the system, then that user's requests will still be chugging along as fast as +they come in. IOQ2 is *NOT* a rate limiting system, it's a prioritization +system that prioritizes requests relative to all other pending requests. +Without sufficient work it's essentially just a pass through. + + +--- + + +## Request for feedback + +IOQ and IOQ2 are complicated beasts, and there's a lot of tunable knobs here. +It is expected that we'll need to experiment with different levels for +different workloads, so please do be diligent about informing @chewbranca of +situations where you've had to change the configuration options above that +request notifications. Any other thoughts/comments/suggestions welcome as well. +Similarly, feedback is welcome on the IOQ2 metrics tab as well. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ccd676c --- /dev/null +++ b/README.md @@ -0,0 +1,34 @@ +### IOQ classes +The following are the list of IOQ classes: + +* interactive +* db_update +* view_update +* db_compact +* view_compact +* internal_repl +* low + + +### Bypassing IOQ +One can configure an ioq bypass, which removes an IO class from prioritization, +as below: + + config:set("ioq.bypass", "view_update", "true") + +Note that setting an IOQ bypass can effectively trump all other classes, +especially in the case of an interactive bypass v. compaction. This can lead +to high disk usage. + +### Setting priorities +The priority for a class can also be set ala: + + config:set("ioq", "compaction", "0.3") + +Or globally, using snippet/rpc: + + s:set_config("ioq", "compaction", "0.314", global) + rpc:multicall(config, set, ["ioq", "compaction", "0.217"]) + +As the interactive class is 'everything else' its priority cannot be directly +set. diff --git a/include/ioq.hrl b/include/ioq.hrl new file mode 100644 index 0000000..55c95f8 --- /dev/null +++ b/include/ioq.hrl @@ -0,0 +1,71 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-define(DEFAULT_PRIORITY, 1.0). +-define(BAD_MAGIC_NUM, -12341234). + +%% Dispatch Strategies +-define(DISPATCH_RANDOM, "random"). +-define(DISPATCH_FD_HASH, "fd_hash"). +-define(DISPATCH_SINGLE_SERVER, "single_server"). +-define(DISPATCH_SERVER_PER_SCHEDULER, "server_per_scheduler"). + +-define(DEFAULT_CLASS_PRIORITIES, [ + {customer, 1.0}, + {internal_repl, 0.001}, + {view_compact, 0.0001}, + {db_compact, 0.0001}, + {low, 0.0001}, + {db_meta, 1.0}, + + {db_update, 1.0}, + {view_update, 1.0}, + {other, 1.0}, + {interactive, 1.0} +]). + + +-record(ioq_request, { + fd, + msg, + key, + init_priority = 1.0, + fin_priority, + ref, + from, + t0, + tsub, + shard, + user, + db, + class, + ddoc +}). + + +-type io_priority() :: db_compact + | db_update + | interactive + | internal_repl + | other + | customer + | db_meta + | low. +-type view_io_priority() :: view_compact + | view_update. +-type dbcopy_string() :: string(). %% "dbcopy" +-type dbname() :: binary() | dbcopy_string(). +-type group_id() :: any(). +-type io_dimensions() :: {io_priority(), dbname()} + | {view_io_priority(), dbname(), group_id()}. +-type ioq_request() :: #ioq_request{}. + diff --git a/operator_guide.md b/operator_guide.md new file mode 100644 index 0000000..81acdcc --- /dev/null +++ b/operator_guide.md @@ -0,0 +1,231 @@ +# An operator's guide to IOQ + +IOQ handles the prioritisation of IO operations in the database. It has +two main responsibilities: + + 1. Providing configurable prioritisation of interactive requests and + background requests such as compaction and internal replication. + 2. Providing equal prioritisation for interactive requests by backend/database. + +From an operational perspective point 1 is of most interest as it provides a set +of levers that can be pulled to change the behaviour of the cluster in favour +of particular workloads or operational concerns. + +## Basic overview + +From an operational point-of-view, IOQ carries out two fundamental operations: + + 1. Enqueueing requests into one of a number of available channels. + 2. Selecting and submitting a request from the available channels according + to configured priorities. + +IOQ categorises IO requests by class and by priority. The class of a request +dictates the channel into which it will be enqueued and the priority influences +the probability that a given request will be dequeued and executed. + +The following table lists the IOQ classes and the corresponding priorities. Note +that the mapping of IOQ classes to class priorities is not 1:1. + +``` +|---------------+---------------+--------------------------------------------| +| IOQ class | IOQ priority | Description | +|---------------+---------------+--------------------------------------------| +| interactive | reads, writes | IO requests related to requests made by | +| | | users via the http layer. | +| | | | +| db_update | writes | Interactive IO requests which are database | +| | | write operations. | +| | | | +| view_update | views | IO requests related to view index builds. | +| | | | +| db_compact | compaction | IO requests related to database | +| | | compactions. | +| | | | +| view_compact | compaction | IO requests related to view compactions. | +| | | | +| internal_repl | replication | IO requests related to internal | +| | | replication. | +| | | | +| low | low | IO requests related to requests made by | +| | | users via the http layer where the | +| | | "x-cloudant-priority: low" header is set. | +| | | | +| other | undefined | IO requests that do not fit any of the | +| | | above classes. This includes search IO | +| | | requests. | +|---------------+---------------+--------------------------------------------| +``` + +## Internals + +To understand the relationship between the IOQ classes and the IOQ priorities +it is helpful to understand the channels into which IO requests are enqueued. + +IOQ uses the following four channels: + + - `Compaction` + - `Internal replication` + - `Low` + - `Customer` + +The `Customer` channel is effectively a meta-channel where each item in the +queue represents a backend/dbname combination that consists of a further three +channels: + + - `Interactive` + - `DB update` + - `View update` + +Requests are enqueued according to the following scheme: + + - Requests with class `internal_repl`, `low`, `db_compact` or `view_compact` + are enqueued into `Internal replication`, `Low` or `Compaction` channels + respectively. + - Requests with class `interactive`, `db_update` or `view_update` are enqueued + into the `Interactive`, `DB update` or `View update` channel of the relevant + `Customer` channel for the backend/database combination. + - Requests with class `other` are enqueued into the `Interactive` queue of a + `Customer` channel reserved for `other` IOQ requests. + +Requests are submitted as follows: + + - The next item is selected from either the `Compaction`, + `Internal replication`, `Low` or `Customer` channel according to the + configured priorities (`compaction`, `replication`, `low` and `customer`). + - If the item is obtained from the `Compaction`, `Internal replication` or + `Low` channels then the request is submitted for execution. + - If the item is obtained from the `Customer` channel then the request is + selected from either the `Interactive`, `DB update` or `View update` channel + according to the configured priorities (`reads`, `writes`, and `views`). + +## Configuration + +Unless there is prior knowledge of the IOQ configuration required to support the +intended workload of a cluster on a given hardware specification it is +recommended that IOQ is initially left with the default configuration values. As +more becomes known about the behaviour of a cluster under load the IOQ settings +can be tuned to provide optimal performance for the production workload. + +Note that tuning IOQ is not the answer to all performance problems and there are +a finite number of gains to be had (possibly zero). You should also be +considering the total load on the cluster, the capabilities of the underlying +hardware and the usage patterns and design of the applications which sit on top +of the data layer. + +### Priorities + +IOQ ships with a default configuration which gives interactive reads/writes and +view builds a high priority (`1.0`) and the background requests a much lower +priority (`0.001` for compaction and `0.0001` for replication and low). + +You can set the priorities to other values using the config app in a remsh as +follows: + + config:set("ioq", "views", "0.5", "FBXXXXX reduce views IOQ priority"). + +To return to the default value just delete the configuration value: + + config:delete("ioq", "views", "FBXXXXX revert to default priority"). + +The following sections describe typical situations where tuning IOQ priorities +might be appropriate. + +#### Internal replication backlog + +If cluster nodes are frequently exhibiting an internal replication backlog +then it might be worth increasing the `replication` priority. + +A backlog can be confirmed by checking the following graphite target: + + net.cloudant.cluster001.db*.erlang.internal_replication_jobs + +If this value is consistently elevated by more than a few hundred changes then +try increasing the `replication` IOQ priority: + + config:set("ioq", "replication", "0.5", "FBXXXXX speed up internal replication"). + +If this has been effective you should notice a change in the rate at which the +metric decreases. It is worth experimenting with values as high as `1.0` however +you will need to keep an eye on HTTP request latencies to make sure there is no +adverse impact on other aspects of cluster performance. + +#### Compactions not completing quickly enough + +If disk usage is rising on cluster nodes and there is a corresponding backlog +in compaction work then it might be worth increasing the `compaction` priority. + +Check the volume of pending changes for ongoing compaction jobs in graphite: + + net.cloudant.cluster001.db1.dbcore.active_tasks.changes_pending.*compaction + +Increase the priority for `compaction`: + + config:set("ioq", "compaction", "0.5", "FBXXXXX speed up compaction"). + +Now monitor the changes_pending metrics to see if the rate at which changes are +being processed has increased. + +The notes in previous section apply here - experiment with values as high as +"1.0" if necessary and keep a close eye on cluster performance whilst you +do so. + +#### Interactive requests and views competing for IO resource + +Metrics might show that read/write performance worsens when views are building +or conversely that view build performance slows when read/write load increases. +If the performance requirements of the cluster are such that a particular +type of request is more critical to the application it supports then it might be +worth reducing the other IOQ priorities, for example: + + config:set("ioq", "views", "0.1", "FBXXXXX attempt to improve read/write performance"). + +### Concurrency + +The concurrency defines the total number of concurrent IO operations allowed by +IOQ. The default value is `20` however it can be worth increasing if the +answer to the following questions is yes: + + 1. Either `net.cloudant.cluster001.db1.erlang.io_queue.active_requests` or + `net.cloudant.cluster001.db1.couchdb.io_queue.latency` is consistently + elevated. + + 2. Disk utilisation is significantly less than 100%. + +If performance is being impacted by request waiting in the queues then it is +worth bumping IOQ concurrency (sensible values to try are `30`, `50` and `100`) +and observing the resulting effect. + +Note that increasing this value beyond a certain point can result in the disks +being overloaded and overall performance degradation. The exact point depends +on the cluster workload and hardware so it is very important to monitor the +cluster when making changes here. + +### Bypasses + +In extreme cases it is possible that IOQ itself is the bottleneck for certain +request classes. If this is case then you can bypass IOQ for that request +class altogether, e.g. for interactive requests: + + config:set("ioq.bypass", "interactive", "true", "FBXXXXX attempt to improve interactive performance"). + +Note that bypasses are set for IOQ *classes* not IOQ priorities. This means if +you wanted to bypass all compaction requests you would need to set a bypass for +`db_compact` and `view_compact`. + +The following warnings should be heeded when considering setting an IOQ bypass: + + - Other request classes will continue to be routed through IOQ so will not + be able to compete with the bypassed requests. You should therefore monitor + the cluster carefully to determine that overall performance is acceptable. + Keep a close eye on compaction in particular (unless it is being bypassed) + as if the rate of compaction slows too much the disk may start filling up. + + - The bypass effectively shifts the bottleneck to another part of the system + which is typically evident in `couch_file` and `couch_db_updater` message + queue backups. + + - Disk performance may also become saturated which could lead to various + resulting performance degradations. + +A good rule of thumb is to avoid IOQ bypasses altogether unless the customer +is in immediate pain. diff --git a/priv/stats_descriptions.cfg b/priv/stats_descriptions.cfg new file mode 100644 index 0000000..d5b15f4 --- /dev/null +++ b/priv/stats_descriptions.cfg @@ -0,0 +1,230 @@ +{[couchdb, io_queue, latency], [ + {type, histogram}, + {desc, <<"delay introduced by routing request through IO queue">>} +]}. +{[couchdb, io_queue, low], [ + {type, counter}, + {desc, <<"number of requests routed through IO at low priority">>} +]}. +{[couchdb, io_queue, merged], [ + {type, counter}, + {desc, <<"number of requests routed through IO queue that were merged">>} +]}. +{[couchdb, io_queue, osproc], [ + {type, counter}, + {desc, <<"number of requests routed through IO os queue">>} +]}. +{[couchdb, io_queue, reads], [ + {type, counter}, + {desc, <<"number of read requests routed through IO queue">>} +]}. +{[couchdb, io_queue, writes], [ + {type, counter}, + {desc, <<"number of write requests routed through IO queue">>} +]}. +{[couchdb, io_queue, undefined], [ + {type, counter}, + {desc, <<"number of requests routed through IO queue without I/O class">>} +]}. +{[couchdb, io_queue, unknown], [ + {type, counter}, + {desc, <<"number of unknown requests routed through IO queue">>} +]}. +{[couchdb, io_queue, db_update], [ + {type, counter}, + {desc, <<"DB update requests routed through IO queue">>} +]}. +{[couchdb, io_queue, db_compact], [ + {type, counter}, + {desc, <<"DB compaction requests routed through IO queue">>} +]}. +{[couchdb, io_queue, view_compact], [ + {type, counter}, + {desc, <<"view compaction requests routed through IO queue">>} +]}. +{[couchdb, io_queue, view_update], [ + {type, counter}, + {desc, <<"view indexing requests routed through IO queue">>} +]}. +{[couchdb, io_queue, interactive], [ + {type, counter}, + {desc, <<"IO directly triggered by client requests">>} +]}. +{[couchdb, io_queue, internal_repl], [ + {type, counter}, + {desc, <<"IO related to internal replication">>} +]}. +{[couchdb, io_queue, other], [ + {type, counter}, + {desc, <<"IO related to internal replication">>} +]}. +{[couchdb, io_queue_bypassed, low], [ + {type, counter}, + {desc, <<"number of requests that bypassed IO at low priority">>} +]}. +{[couchdb, io_queue_bypassed, merged], [ + {type, counter}, + {desc, <<"number of requests that bypassed IO queue that were merged">>} +]}. +{[couchdb, io_queue_bypassed, osproc], [ + {type, counter}, + {desc, <<"number of requests that bypassed IO os queue">>} +]}. +{[couchdb, io_queue_bypassed, reads], [ + {type, counter}, + {desc, <<"number of read requests that bypassed IO queue">>} +]}. +{[couchdb, io_queue_bypassed, writes], [ + {type, counter}, + {desc, <<"number of write requests that bypassed IO queue">>} +]}. +{[couchdb, io_queue_bypassed, undefined], [ + {type, counter}, + {desc, <<"number of requests that bypassed IO queue without I/O class">>} +]}. +{[couchdb, io_queue_bypassed, unknown], [ + {type, counter}, + {desc, <<"number of unknown requests that bypassed IO queue">>} +]}. +{[couchdb, io_queue_bypassed, db_update], [ + {type, counter}, + {desc, <<"DB update requests that bypassed IO queue">>} +]}. +{[couchdb, io_queue_bypassed, db_compact], [ + {type, counter}, + {desc, <<"DB compaction requests that bypassed IO queue">>} +]}. +{[couchdb, io_queue_bypassed, view_compact], [ + {type, counter}, + {desc, <<"view compaction requests that bypassed IO queue">>} +]}. +{[couchdb, io_queue_bypassed, view_update], [ + {type, counter}, + {desc, <<"view indexing requests that bypassed IO queue">>} +]}. +{[couchdb, io_queue_bypassed, interactive], [ + {type, counter}, + {desc, <<"bypassed IO directly triggered by client requests">>} +]}. +{[couchdb, io_queue_bypassed, internal_repl], [ + {type, counter}, + {desc, <<"bypassed IO related to internal replication">>} +]}. +{[couchdb, io_queue_bypassed, other], [ + {type, counter}, + {desc, <<"bypassed IO related to internal replication">>} +]}. + + +{[couchdb, io_queue2, io_errors], [ + {type, counter}, + {desc, <<"number of IO errors">>} +]}. +{[couchdb, io_queue2, merged], [ + {type, counter}, + {desc, <<"number of requests routed through IO queue that were merged">>} +]}. + +{[couchdb, io_queue2, submit_delay], [ + {type, histogram}, + {desc, <<"delay introduced by routing request through IO queue">>} +]}. +{[couchdb, io_queue2, svctm], [ + {type, histogram}, + {desc, <<"time taken to service the IO request">>} +]}. +{[couchdb, io_queue2, iowait], [ + {type, histogram}, + {desc, <<"Total time request spent waiting on IO">>} +]}. + +{[couchdb, io_queue2, low, count], [ + {type, counter}, + {desc, <<"number of requests routed through IO at low priority">>} +]}. +{[couchdb, io_queue2, osproc, count], [ + {type, counter}, + {desc, <<"number of requests routed through IO os queue">>} +]}. +{[couchdb, io_queue2, reads, count], [ + {type, counter}, + {desc, <<"number of read requests routed through IO queue">>} +]}. +{[couchdb, io_queue2, writes, count], [ + {type, counter}, + {desc, <<"number of write requests routed through IO queue">>} +]}. +{[couchdb, io_queue2, undefined, count], [ + {type, counter}, + {desc, <<"number of requests routed through IO queue without I/O class">>} +]}. +{[couchdb, io_queue2, unknown, count], [ + {type, counter}, + {desc, <<"number of unknown requests routed through IO queue">>} +]}. +{[couchdb, io_queue2, db_update, count], [ + {type, counter}, + {desc, <<"DB update requests routed through IO queue">>} +]}. +{[couchdb, io_queue2, db_compact, count], [ + {type, counter}, + {desc, <<"DB compaction requests routed through IO queue">>} +]}. +{[couchdb, io_queue2, view_compact, count], [ + {type, counter}, + {desc, <<"view compaction requests routed through IO queue">>} +]}. +{[couchdb, io_queue2, view_update, count], [ + {type, counter}, + {desc, <<"view indexing requests routed through IO queue">>} +]}. +{[couchdb, io_queue2, interactive, count], [ + {type, counter}, + {desc, <<"IO directly triggered by client requests">>} +]}. +{[couchdb, io_queue2, db_meta, count], [ + {type, counter}, + {desc, <<"IO related to db_meta">>} +]}. +{[couchdb, io_queue2, internal_repl, count], [ + {type, counter}, + {desc, <<"IO related to internal replication">>} +]}. +{[couchdb, io_queue2, other, count], [ + {type, counter}, + {desc, <<"IO related to internal replication">>} +]}. + +{[couchdb, io_queue2, bypassed_count], [ + {type, counter}, + {desc, <<"number of requests that bypassed IO queue">>} +]}. +{[couchdb, io_queue2, reads, bypassed_count], [ + {type, counter}, + {desc, <<"number of read requests that bypassed IO queue">>} +]}. +{[couchdb, io_queue2, writes, bypassed_count], [ + {type, counter}, + {desc, <<"number of write requests that bypassed IO queue">>} +]}. +{[couchdb, io_queue2, unknown, bypassed_count], [ + {type, counter}, + {desc, <<"number of unknown requests that bypassed IO queue">>} +]}. + +{[couchdb, io_queue2, reads, queued], [ + {type, counter}, + {desc, <<"number of read requests queued into IO queue">>} +]}. +{[couchdb, io_queue2, writes, queued], [ + {type, counter}, + {desc, <<"number of write requests queued into IO queue">>} +]}. +{[couchdb, io_queue2, unknown, queued], [ + {type, counter}, + {desc, <<"number of unknown requests queued into IO queue">>} +]}. +{[couchdb, io_queue2, queued], [ + {type, counter}, + {desc, <<"number of requests queued into IO">>} +]}. diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..b25fdb5 --- /dev/null +++ b/rebar.config @@ -0,0 +1,11 @@ +{deps, [ + {proper, ".*", {git, "https://github.com/manopapad/proper.git", "master"}} +]}. + +{eunit_opts, [ + verbose, + {report, { + eunit_surefire, [{dir,"."}] + }} +]}. + diff --git a/src/ioq.app.src b/src/ioq.app.src index 65ea50d..04310bb 100644 --- a/src/ioq.app.src +++ b/src/ioq.app.src @@ -11,11 +11,14 @@ % the License. {application,ioq, [ - {description, "I/O prioritizing engine"}, + {description, "IO request management in a multi-tenant Erlang VM"}, {vsn, git}, {registered,[]}, - {applications,[kernel,stdlib,config]}, + {applications,[kernel,stdlib,config,couch_stats,hqueue]}, {mod,{ioq_app,[]}}, - {env, []}, - {modules,[ioq,ioq_app,ioq_sup]} + {env, [ + {stats_db, "stats"}, + {stats_interval, 60000} + ]}, + {modules,[ioq,ioq_app,ioq_osq,ioq_server,ioq_sup]} ]}. diff --git a/src/ioq.erl b/src/ioq.erl index 9ca2656..160a448 100644 --- a/src/ioq.erl +++ b/src/ioq.erl @@ -11,148 +11,66 @@ % the License. -module(ioq). --behaviour(gen_server). --behaviour(config_listener). +-export([start/0, stop/0, call/3, call/4, set_disk_concurrency/1, + get_disk_queues/0, get_osproc_queues/0, get_osproc_requests/0, + get_disk_counters/0, get_disk_concurrency/0]). +-export([ + ioq2_enabled/0 +]). --export([start_link/0, call/3]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). +-define(APPS, [config, folsom, couch_stats, ioq]). -% config_listener api --export([handle_config_change/5, handle_config_terminate/3]). +start() -> + lists:foldl(fun(App, _) -> application:start(App) end, ok, ?APPS). --define(RELISTEN_DELAY, 5000). +stop() -> + lists:foldr(fun(App, _) -> ok = application:stop(App) end, ok, ?APPS). --record(state, { - concurrency, - ratio, - interactive=queue:new(), - compaction=queue:new(), - running=[] -}). - --record(request, { - fd, - msg, - priority, - from, - ref -}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +call(Fd, Request, Arg, Priority) -> + call(Fd, {Request, Arg}, Priority). +call(Pid, {prompt, _} = Msg, Priority) -> + ioq_osq:call(Pid, Msg, Priority); +call(Pid, {data, _} = Msg, Priority) -> + ioq_osq:call(Pid, Msg, Priority); call(Fd, Msg, Priority) -> - Request = #request{fd=Fd, msg=Msg, priority=Priority, from=self()}, - try - gen_server:call(?MODULE, Request, infinity) - catch - exit:{noproc,_} -> - gen_server:call(Fd, Msg, infinity) + case ioq2_enabled() of + false -> ioq_server:call(Fd, Msg, Priority); + true -> ioq_server2:call(Fd, Msg, Priority) end. -init(_) -> - ok = config:listen_for_changes(?MODULE, nil), - State = #state{}, - {ok, read_config(State)}. - -read_config(State) -> - Ratio = list_to_float(config:get("ioq", "ratio", "0.01")), - Concurrency = list_to_integer(config:get("ioq", "concurrency", "10")), - State#state{concurrency=Concurrency, ratio=Ratio}. - -handle_call(#request{}=Request, From, State) -> - {noreply, enqueue_request(Request#request{from=From}, State), 0}. - -handle_cast(change, State) -> - {noreply, read_config(State)}; -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({Ref, Reply}, State) -> - case lists:keytake(Ref, #request.ref, State#state.running) of - {value, Request, Remaining} -> - erlang:demonitor(Ref, [flush]), - gen_server:reply(Request#request.from, Reply), - {noreply, State#state{running=Remaining}, 0}; - false -> - {noreply, State, 0} +set_disk_concurrency(C) when is_integer(C), C > 0 -> + case ioq2_enabled() of + false -> gen_server:call(ioq_server, {set_concurrency, C}); + true -> ioq_server2:set_concurrency(C) end; -handle_info({'DOWN', Ref, _, _, Reason}, State) -> - case lists:keytake(Ref, #request.ref, State#state.running) of - {value, Request, Remaining} -> - gen_server:reply(Request#request.from, {'EXIT', Reason}), - {noreply, State#state{running=Remaining}, 0}; - false -> - {noreply, State, 0} - end; -handle_info(restart_config_listener, State) -> - ok = config:listen_for_changes(?MODULE, nil), - {noreply, State}; -handle_info(timeout, State) -> - {noreply, maybe_submit_request(State)}. - -handle_config_change("ioq", _, _, _, _) -> - {ok, gen_server:cast(?MODULE, change)}; -handle_config_change(_, _, _, _, _) -> - {ok, nil}. +set_disk_concurrency(_) -> + erlang:error(badarg). -handle_config_terminate(_Server, stop, _State) -> - ok; -handle_config_terminate(_Server, _Reason, _State) -> - erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener). +get_disk_concurrency() -> + case ioq2_enabled() of + false -> gen_server:call(ioq_server, get_concurrency); + true -> ioq_server2:get_concurrency() + end. -code_change(_Vsn, State, _Extra) -> - {ok, State}. +get_disk_queues() -> + case ioq2_enabled() of + false -> gen_server:call(ioq_server, get_queue_depths); + true -> ioq_server2:get_queue_depths() + end. -terminate(_Reason, _State) -> - ok. +get_disk_counters() -> + case ioq2_enabled() of + false -> gen_server:call(ioq_server, get_counters); + true -> ioq_server2:get_counters() + end. -enqueue_request(#request{priority={db_compact, _}}=Request, #state{}=State) -> - State#state{compaction=queue:in(Request, State#state.compaction)}; -enqueue_request(#request{priority={view_compact, _, _}}=Request, #state{}=State) -> - State#state{compaction=queue:in(Request, State#state.compaction)}; -enqueue_request(#request{}=Request, #state{}=State) -> - State#state{interactive=queue:in(Request, State#state.interactive)}. +get_osproc_queues() -> + gen_server:call(ioq_osq, get_queue_depths). -maybe_submit_request(#state{concurrency=Concurrency, running=Running}=State) - when length(Running) < Concurrency -> - case make_next_request(State) of - State -> - State; - NewState when length(Running) >= Concurrency - 1 -> - NewState; - NewState -> - maybe_submit_request(NewState) - end; -maybe_submit_request(State) -> - State. +get_osproc_requests() -> + gen_server:call(ioq_osq, get_requests). -make_next_request(#state{}=State) -> - case {queue:is_empty(State#state.compaction), queue:is_empty(State#state.interactive)} of - {true, true} -> - State; - {true, false} -> - choose_next_request(#state.interactive, State); - {false, true} -> - choose_next_request(#state.compaction, State); - {false, false} -> - case couch_rand:uniform() < State#state.ratio of - true -> - choose_next_request(#state.compaction, State); - false -> - choose_next_request(#state.interactive, State) - end - end. - -choose_next_request(Index, State) -> - case queue:out(element(Index, State)) of - {empty, _} -> - State; - {{value, Request}, Q} -> - submit_request(Request, setelement(Index, State, Q)) - end. +ioq2_enabled() -> + config:get_boolean("ioq2", "enabled", false). -submit_request(#request{}=Request, #state{}=State) -> - Ref = erlang:monitor(process, Request#request.fd), - Request#request.fd ! {'$gen_call', {self(), Ref}, Request#request.msg}, - State#state{running = [Request#request{ref=Ref} | State#state.running]}. diff --git a/src/ioq_app.erl b/src/ioq_app.erl index 2e6d75a..f24dcbb 100644 --- a/src/ioq_app.erl +++ b/src/ioq_app.erl @@ -15,6 +15,7 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> + ok = ioq_kv:init(), ioq_sup:start_link(). stop(_State) -> diff --git a/src/ioq_config.erl b/src/ioq_config.erl new file mode 100644 index 0000000..9cc2371 --- /dev/null +++ b/src/ioq_config.erl @@ -0,0 +1,314 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_config). + + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("ioq/include/ioq.hrl"). + + +-export([ + build_shard_priorities/0, + build_shard_priorities/1, + build_user_priorities/0, + build_user_priorities/1, + build_class_priorities/0, + build_class_priorities/1, + add_default_class_priorities/1, + to_float/1, + to_float/2, + parse_shard_string/1, + ioq_classes/0, + is_valid_class/1 +]). +-export([ + prioritize/4, + check_priority/3 +]). +-export([ + set_db_config/4, + set_shards_config/4, + set_shard_config/4, + set_class_config/3, + set_user_config/3 +]). +-export([ + set_bypass/3, + set_enabled/2, + set_max_priority/2, + set_dedupe/2, + set_scale_factor/2, + set_resize_limit/2, + set_concurrency/2, + set_dispatch_strategy/2 +]). + + +-define(SHARD_CLASS_SEPARATOR, "||"). +-define(IOQ2_CONFIG, "ioq2"). +-define(IOQ2_BYPASS_CONFIG, "ioq2.bypass"). +-define(IOQ2_SHARDS_CONFIG, "ioq2.shards"). +-define(IOQ2_USERS_CONFIG, "ioq2.users"). +-define(IOQ2_CLASSES_CONFIG, "ioq2.classes"). + + +ioq_classes() -> + [Class || {Class, _Priority} <- ?DEFAULT_CLASS_PRIORITIES]. + + +set_bypass(Class, Value, Reason) when is_atom(Class), is_boolean(Value) -> + true = is_valid_class(Class), + set_config(?IOQ2_BYPASS_CONFIG, atom_to_list(Class), atom_to_list(Value), Reason). + + +set_enabled(Value, Reason) when is_boolean(Value) -> + set_config(?IOQ2_CONFIG, "enabled", atom_to_list(Value), Reason). + + +set_max_priority(Value, Reason) when is_float(Value) -> + set_config(?IOQ2_CONFIG, "max_priority", Value, Reason). + + +set_dedupe(Value, Reason) when is_boolean(Value) -> + set_config(?IOQ2_CONFIG, "dedupe", atom_to_list(Value), Reason). + + +set_scale_factor(Value, Reason) when is_float(Value) -> + set_config(?IOQ2_CONFIG, "scale_factor", float_to_list(Value), Reason). + + +set_resize_limit(Value, Reason) when is_integer(Value) -> + set_config(?IOQ2_CONFIG, "resize_limit", integer_to_list(Value), Reason). + + +set_concurrency(Value, Reason) when is_integer(Value) -> + set_config(?IOQ2_CONFIG, "concurrency", integer_to_list(Value), Reason). + + +set_dispatch_strategy(Value, Reason) -> + ErrorMsg = "Dispatch strategy must be one of " + "random, fd_hash, server_per_scheduler, or single_server.", + ok = case Value of + ?DISPATCH_RANDOM -> ok; + ?DISPATCH_FD_HASH -> ok; + ?DISPATCH_SINGLE_SERVER -> ok; + ?DISPATCH_SERVER_PER_SCHEDULER -> ok; + _ -> throw({badarg, ErrorMsg}) + end, + config:set(?IOQ2_CONFIG, "dispatch_strategy", Value, Reason). + + +set_db_config(DbName, Class, Value, Reason) when is_binary(DbName) -> + ok = check_float_value(Value), + ok = set_shards_config(mem3:shards(DbName), Class, Value, Reason). + + +set_shards_config(Shards, Class, Value, Reason) -> + ok = check_float_value(Value), + ok = lists:foreach(fun(Shard) -> + ok = set_shard_config(Shard, Class, Value, Reason) + end, Shards). + + +set_shard_config(#shard{name=Name0}, Class0, Value, Reason) when is_atom(Class0) -> + ok = check_float_value(Value), + true = is_valid_class(Class0), + Name = binary_to_list(filename:rootname(Name0)), + Class = atom_to_list(Class0), + ConfigName = Name ++ ?SHARD_CLASS_SEPARATOR ++ Class, + ok = set_config(?IOQ2_SHARDS_CONFIG, ConfigName, Value, Reason). + + +set_class_config(Class, Value, Reason) when is_atom(Class)-> + ok = check_float_value(Value), + true = is_valid_class(Class), + ok = set_config(?IOQ2_CLASSES_CONFIG, atom_to_list(Class), Value, Reason). + + +set_user_config(User, Value, Reason) when is_binary(User) -> + set_user_config(binary_to_list(User), Value, Reason); +set_user_config(User, Value, Reason) -> + ok = check_float_value(Value), + %% TODO: validate User exists (how to do this without a Req?) + ok = set_config(?IOQ2_USERS_CONFIG, User, Value, Reason). + + +is_valid_class(Class) -> + lists:member(Class, ioq_classes()). + + +check_float_value(Value) when is_float(Value) -> + ok; +check_float_value(_) -> + erlang:error({badarg, invalid_float_value}). + + +set_config(Section, Key, Value, Reason) when is_float(Value) -> + set_config(Section, Key, float_to_list(Value), Reason); +set_config(Section, Key, Value, Reason) when is_binary(Key) -> + set_config(Section, binary_to_list(Key), Value, Reason); +set_config(Section, Key, Value, Reason) -> + ok = config:set(Section, Key, Value, Reason). + + +-spec build_shard_priorities() -> {ok, khash:khash()}. +build_shard_priorities() -> + Configs = lists:foldl( + fun({Key0, Val}, Acc) -> + case parse_shard_string(Key0) of + {error, ShardString} -> + couch_log:error( + "IOQ error parsing shard config: ~p", + [ShardString] + ), + Acc; + Key -> + [{Key, to_float(Val)} | Acc] + end + end, + [], + config:get("ioq2.shards") + ), + build_shard_priorities(Configs). + + +-spec build_shard_priorities([{any(), float()}]) -> {ok, khash:khash()}. +build_shard_priorities(Configs) -> + init_config_priorities(Configs). + + +-spec build_user_priorities() -> {ok, khash:khash()}. +build_user_priorities() -> + build_user_priorities(config:get("ioq2.users")). + + +-spec build_user_priorities([{any(), float()}]) -> {ok, khash:khash()}. +build_user_priorities(Configs0) -> + Configs = [{list_to_binary(K), to_float(V)} || {K,V} <- Configs0], + init_config_priorities(Configs). + + +-spec build_class_priorities() -> {ok, khash:khash()}. +build_class_priorities() -> + build_class_priorities(config:get("ioq2.classes")). + + +-spec build_class_priorities([{any(), float()}]) -> {ok, khash:khash()}. +build_class_priorities(Configs0) -> + {ok, ClassP} = khash:new(), + ok = add_default_class_priorities(ClassP), + Configs = [{list_to_existing_atom(K), to_float(V)} || {K,V} <- Configs0], + init_config_priorities(Configs, ClassP). + + +-spec parse_shard_string(string()) -> {binary(), atom()} + | {error, string()}. +parse_shard_string(ShardString) -> + case string:tokens(ShardString, ?SHARD_CLASS_SEPARATOR) of + [Shard, Class] -> + {list_to_binary(Shard), list_to_existing_atom(Class)}; + _ -> + {error, ShardString} + end. + + +-spec add_default_class_priorities(khash:khash()) -> ok. +add_default_class_priorities(ClassP) -> + ok = lists:foreach( + fun({Class, Priority}) -> + ok = khash:put(ClassP, Class, Priority) + end, + ?DEFAULT_CLASS_PRIORITIES + ). + + +-spec to_float(any()) -> float(). +to_float(V) -> + to_float(V, ?DEFAULT_PRIORITY). + + +-spec to_float(any(), float()) -> float(). +to_float(Float, _) when is_float(Float) -> + Float; +to_float(Int, _) when is_integer(Int) -> + float(Int); +to_float(String, Default) when is_list(String) -> + try + list_to_float(String) + catch error:badarg -> + try + to_float(list_to_integer(String)) + catch error:badarg -> + Default + end + end; +to_float(_, Default) -> + Default. + + +-spec prioritize(ioq_request(), khash:khash(), khash:khash(), khash:khash()) -> + float(). +prioritize(#ioq_request{} = Req, ClassP, UserP, ShardP) -> + #ioq_request{ + user=User, + shard=Shard, + class=Class + } = Req, + UP = get_priority(UserP, User), + CP = get_priority(ClassP, Class), + SP = get_priority(ShardP, {Shard, Class}), + UP * CP * SP. + + +-spec init_config_priorities([{any(), float()}]) -> {ok, khash:khash()}. +init_config_priorities(Configs) -> + {ok, Hash} = khash:new(), + init_config_priorities(Configs, Hash). + + +-spec init_config_priorities([{any(), float()}], khash:khash()) -> + {ok, khash:khash()}. +init_config_priorities(Configs, Hash) -> + ok = lists:foreach( + fun({Key, Val}) -> + ok = khash:put(Hash, Key, Val) + end, + Configs + ), + {ok, Hash}. + + +-spec check_priority(atom(), binary(), binary()) -> float(). +check_priority(Class, User, Shard0) -> + {ok, ClassP} = build_class_priorities(), + {ok, UserP} = build_user_priorities(), + {ok, ShardP} = build_shard_priorities(), + + Shard = filename:rootname(Shard0), + Req = #ioq_request{ + user = User, + shard = Shard, + class = Class + }, + + prioritize(Req, ClassP, UserP, ShardP). + + +get_priority(KH, Key) -> + get_priority(KH, Key, ?DEFAULT_PRIORITY). + + +get_priority(_KH, undefined, Default) -> + Default; +get_priority(KH, Key, Default) -> + khash:get(KH, Key, Default). diff --git a/src/ioq_config_listener.erl b/src/ioq_config_listener.erl new file mode 100644 index 0000000..fc10880 --- /dev/null +++ b/src/ioq_config_listener.erl @@ -0,0 +1,54 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_config_listener). + +-vsn(2). +-behaviour(config_listener). + +-export([ + subscribe/0 +]). + +-export([ + handle_config_change/5, + handle_config_terminate/3 +]). + +subscribe() -> + config:listen_for_changes(?MODULE, nil). + +handle_config_change("ioq", _Key, _Val, _Persist, St) -> + ok = notify_ioq_pids(), + {ok, St}; +handle_config_change("ioq2", _Key, _Val, _Persist, St) -> + ok = notify_ioq_pids(), + {ok, St}; +handle_config_change("ioq2."++_Type, _Key, _Val, _Persist, St) -> + ok = notify_ioq_pids(), + {ok, St}; +handle_config_change(_Sec, _Key, _Val, _Persist, St) -> + {ok, St}. + +handle_config_terminate(_, stop, _) -> ok; +handle_config_terminate(_, _, _) -> + % We may have missed a change in the last five seconds + gen_server:cast(ioq_server, update_config), + spawn(fun() -> + timer:sleep(5000), + config:listen_for_changes(?MODULE, nil) + end). + +notify_ioq_pids() -> + ok = lists:foreach(fun(Pid) -> + gen_server:cast(Pid, update_config) + end, ioq_sup:get_ioq2_servers()). diff --git a/src/ioq_kv.erl b/src/ioq_kv.erl new file mode 100644 index 0000000..cf8b077 --- /dev/null +++ b/src/ioq_kv.erl @@ -0,0 +1,169 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +%% Based on Bob Ippolitto's mochiglobal.erl + +%% +-module(ioq_kv). +-export([init/0]). +-export([all/0, get/1, get/2, put/2, delete/1]). +-export([validate_term/1]). + +-define(DYNMOD, ioq_kv_dyn). +-define(ERLFILE, "ioq_kv_dyn.erl"). + +-spec init() -> ok. +%% @doc Initialize the dynamic module +init() -> + compile(all()). + +-spec all() -> [{any(), any()}]. +%% @doc Get the list of Key/Val pairs stored +all() -> + try + ?DYNMOD:list() + catch error:undef -> + [] + end. + +-spec get(any()) -> any() | undefined. +%% @equiv get(Key, undefined) +get(Key) -> + get(Key, undefined). + +-spec get(any(), T) -> any() | T. +%% @doc Get the term for Key or return Default. +get(Key, Default) -> + try + ?DYNMOD:lookup(Key, Default) + catch error:undef -> + Default + end. + +-spec put(any(), any()) -> ok. +%% @doc Store term Val at Key, replaces an existing term if present. +put(Key, Val) -> + KVs = proplists:delete(Key, all()), + compile([{Key, Val} | KVs]). + +-spec delete(any()) -> ok. +%% @doc Delete term stored at Key, no-op if non-existent. +delete(Key) -> + KVs = proplists:delete(Key, all()), + compile(KVs). + + +compile(KVs) -> + Bin = compile_mod(KVs), + code:purge(?DYNMOD), + {module, ?DYNMOD} = code:load_binary(?DYNMOD, ?ERLFILE, Bin), + ok. + + +-spec compile_mod([any()]) -> binary(). +compile_mod(KVs) -> + Opts = [verbose, report_errors], + {ok, ?DYNMOD, Bin} = compile:forms(forms(KVs), Opts), + Bin. + + +-spec forms([any()]) -> [erl_syntax:syntaxTree()]. +forms(KVs) -> + validate_term(KVs), + Statements = [ + module_stmt(), + export_stmt(), + list_function(KVs), + lookup_function(KVs) + ], + [erl_syntax:revert(X) || X <- Statements]. + + +-spec module_stmt() -> erl_syntax:syntaxTree(). +module_stmt() -> + erl_syntax:attribute( + erl_syntax:atom(module), + [erl_syntax:atom(?DYNMOD)] + ). + +-spec export_stmt() -> erl_syntax:syntaxTree(). +export_stmt() -> + erl_syntax:attribute( + erl_syntax:atom(export), + [erl_syntax:list([ + erl_syntax:arity_qualifier( + erl_syntax:atom(list), + erl_syntax:integer(0)), + erl_syntax:arity_qualifier( + erl_syntax:atom(lookup), + erl_syntax:integer(2)) + ])] + ). + + +-spec list_function([any()]) -> erl_syntax:syntaxTree(). +list_function(KVs) -> + erl_syntax:function( + erl_syntax:atom(list), + [erl_syntax:clause([], none, [erl_syntax:abstract(KVs)])]). + + +-spec lookup_function([any()]) -> erl_syntax:syntaxTree(). +lookup_function(KVs) -> + Clauses = lists:foldl(fun({K, V}, ClauseAcc) -> + Patterns = [erl_syntax:abstract(K), erl_syntax:underscore()], + Bodies = [erl_syntax:abstract(V)], + [erl_syntax:clause(Patterns, none, Bodies) | ClauseAcc] + end, [default_clause()], KVs), + erl_syntax:function(erl_syntax:atom(lookup), Clauses). + + +-spec default_clause() -> erl_syntax:syntaxTree(). +default_clause() -> + Patterns = [erl_syntax:underscore(), erl_syntax:variable("Default")], + Bodies = [erl_syntax:variable("Default")], + erl_syntax:clause(Patterns, none, Bodies). + + +-spec validate_term(any()) -> ok. +%% @doc Validate that a term is supported. Throws invalid_term +%% on error. +validate_term(T) when is_list(T) -> + validate_list(T); +validate_term(T) when is_tuple(T) -> + validate_tuple(T); +validate_term(T) when is_bitstring(T) -> + case bit_size(T) rem 8 of + 0 -> ok; + _ -> erlang:error(invalid_term) + end; +validate_term(_T) -> + ok. + +-spec validate_list(list()) -> ok. +validate_list([]) -> + ok; +validate_list([H|T]) -> + validate_term(H), + validate_list(T). + +-spec validate_tuple(tuple()) -> ok. +validate_tuple(T) -> + validate_tuple(T, 1, size(T)). + +-spec validate_tuple(tuple(), pos_integer(), pos_integer()) -> ok. +validate_tuple(T, Pos, Size) when Pos =< Size -> + validate_term(element(Pos, T)), + validate_tuple(T, Pos+1, Size); +validate_tuple(_, _, _) -> + ok. + diff --git a/src/ioq_osq.erl b/src/ioq_osq.erl new file mode 100644 index 0000000..2dbfaea --- /dev/null +++ b/src/ioq_osq.erl @@ -0,0 +1,253 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_osq). +-behaviour(gen_server). +-vsn(1). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0, call/3]). + +-record(channel, { + name, + q = queue:new() +}). + +-record(state, { + reqs = [], + min = 2, + max = 6, + global_max = 15, + channels = queue:new() +}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% WARNING %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This server relies on the internal structure of the channels queue as a %% +%% {list(), list()} to do in-place modifications of some elements. We are %% +%% "running on thin ice", as it were. %% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +call(Pid, Msg, Priority) -> + Reply = gen_server:call(ioq_osq, {rlimit, nil, Priority, now()}, infinity), + try + gen_server:call(Pid, Msg, infinity) + after + whereis(ioq_osq) ! Reply + end. + + +init([]) -> + ets:new(osq_counters, [named_table]), + St = #state{}, + {ok, St#state{ + min = threshold("minimum", St#state.min), + max = threshold("maximum", St#state.max), + global_max = threshold("global_maximum", St#state.global_max) + }}. + +handle_call({set_minimum, C}, _From, State) when is_integer(C), C > 0 -> + {reply, ok, State#state{min = C}}; + +handle_call({set_maximum, C}, _From, State) when is_integer(C), C > 0 -> + {reply, ok, State#state{max = C}}; + +handle_call({set_global_maximum, C}, _From, State) when is_integer(C), C > 0 -> + {reply, ok, State#state{global_max = C}}; + +handle_call(get_queue_depths, _From, State) -> + Channels = [{N, queue:len(Q)} || #channel{name=N, q=Q} + <- queue:to_list(State#state.channels)], + {reply, Channels, State}; + +handle_call(get_requests, _From, State) -> + {reply, State#state.reqs, State}; + +handle_call({_, _, {interactive, Shard}, _} = Req, From, State) -> + {noreply, enqueue_channel(channel_name(Shard), {Req, From}, State)}; + +handle_call({_, _, {view_update, Shard, _}, _} = Req, From, State) -> + {noreply, enqueue_channel(channel_name(Shard), {Req, From}, State)}; + +handle_call({Fd, _, _, _} = Req, From, State) when is_pid(Fd) -> + {noreply, enqueue_channel(other, {Req, From}, State)}; + +handle_call({rlimit, _, _, _} = Req, From, State) -> + {noreply, enqueue_channel(other, {Req, From}, State)}; + +handle_call(_Msg, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({Ref, Reply}, #state{reqs = Reqs} = State) -> + case lists:keyfind(Ref, 3, Reqs) of + {_, notify, Ref} -> + erlang:demonitor(Ref, [flush]), + Reqs2 = lists:keydelete(Ref, 3, Reqs), + {noreply, make_next_request(State#state{reqs = Reqs2})}; + {_, From, Ref} -> + erlang:demonitor(Ref, [flush]), + gen_server:reply(From, Reply), + Reqs2 = lists:keydelete(Ref, 3, Reqs), + {noreply, make_next_request(State#state{reqs = Reqs2})}; + false -> + {noreply, State} + end; + +handle_info({'DOWN', Ref, _, _, Reason}, #state{reqs = Reqs} = State) -> + case lists:keyfind(Ref, 3, Reqs) of + {_, notify, Ref} -> + Reqs2 = lists:keydelete(Ref, 3, Reqs), + {noreply, make_next_request(State#state{reqs = Reqs2})}; + {_, From, Ref} -> + gen_server:reply(From, {'EXIT', Reason}), + Reqs2 = lists:keydelete(Ref, 3, Reqs), + {noreply, make_next_request(State#state{reqs = Reqs2})}; + false -> + {noreply, State} + end; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +channel_name(Shard) -> + try re:split(Shard, "/") of + [<<"shards">>, _, <<"heroku">>, AppId | _] -> + <>; + [<<"shards">>, _, Account | _] -> + Account; + _ -> + other + catch _:_ -> + other + end. + +find_channel(Account, {A, B}) -> + case lists:keyfind(Account, 2, A) of + false -> + case lists:keyfind(Account, 2, B) of + false -> + {new, #channel{name = Account}}; + #channel{} = Channel -> + {2, Channel} + end; + #channel{} = Channel -> + {1, Channel} + end. + +update_channel(#channel{q = Q} = Ch, Req) -> + Ch#channel{q = queue:in(Req, Q)}. + +enqueue_channel(Account, Req, #state{channels = Q} = State) -> + NewState = case find_channel(Account, Q) of + {new, Channel0} -> + State#state{channels = queue:in(update_channel(Channel0, Req), Q)}; + {Elem, Channel0} -> + Channel = update_channel(Channel0, Req), + % the channel already exists in the queue - update it in place + L = element(Elem, Q), + NewQ = setelement(Elem, Q, lists:keyreplace(Account, 2, L, Channel)), + State#state{channels = NewQ} + end, + maybe_submit_request(NewState). + +maybe_submit_request(#state{global_max=C, reqs=R} = St) when length(R) < C -> + make_next_request(St); +maybe_submit_request(#state{min = Min} = State) -> + % look for a channel which hasn't reached the minimum yet + make_next_request(State, Min). + +make_next_request(#state{max = Max} = State) -> + % default behavior, look for a channel not yet maxed out + make_next_request(State, Max). + +make_next_request(#state{channels = Channels, reqs = R} = State, Threshold) -> + case next_unblocked_channel(Channels, R, Threshold, queue:new()) of + {#channel{name = Name, q = Q} = Ch, OutChannels} -> + {{value, Item}, NewQ} = queue:out(Q), + case queue:is_empty(NewQ) of + true -> + NewCh = OutChannels; + false -> + NewCh = queue:in(Ch#channel{q = NewQ}, OutChannels) + end, + submit_request(Name, Item, State#state{channels = NewCh}); + {nil, OutQ} -> + % everybody is using their allotted slots, try again later + State#state{channels = OutQ} + end. + +next_unblocked_channel(InQ, Reqs, Max, OutQ) -> + case queue:out(InQ) of + {empty, _} -> % all channels blocked + {nil, OutQ}; + {{value, #channel{name=Name} = Channel}, NewQ} -> + case length([1 || {N, _, _} <- Reqs, N =:= Name]) >= Max of + true -> % channel is blocked, keep searching + next_unblocked_channel(NewQ, Reqs, Max, queue:in(Channel, OutQ)); + false -> + {Channel, queue:join(NewQ, OutQ)} + end + end. + + +submit_request(Channel, {{rlimit,_,Pri,T0}, From}, #state{reqs=Reqs} = State) -> + % rlimit fd means that we'll get a response back + % from the pid after it performs the call on its + % own + Ref = erlang:monitor(process, element(1, From)), + gen_server:reply(From, {Ref, nil}), + record_stats(Channel, Pri, T0), + State#state{reqs = [{Channel, notify, Ref} | Reqs]}; + +submit_request(Channel, {{Fd,Call,Pri,T0}, From}, #state{reqs=Reqs} = State) -> + % make the request + Ref = erlang:monitor(process, Fd), + Fd ! {'$gen_call', {self(), Ref}, Call}, + record_stats(Channel, Pri, T0), + State#state{reqs = [{Channel, From, Ref} | Reqs]}. + +record_stats(Channel, Pri, T0) -> + IOClass = if is_tuple(Pri) -> element(1, Pri); true -> Pri end, + Latency = timer:now_diff(now(),T0) / 1000, + catch couch_stats:increment_counter([couchdb, io_queue, IOClass]), + catch couch_stats:increment_counter([couchdb, io_queue, osproc]), + catch couch_stats:update_histogram([couchdb, io_queue, latency], Latency), + update_counter(Channel, IOClass, osproc). + +update_counter(Channel, IOClass, RW) -> + try ets:update_counter(osq_counters, {Channel, IOClass, RW}, 1) + catch error:badarg -> + ets:insert(osq_counters, {{Channel, IOClass, RW}, 1}) + end. + +threshold(Name, Default) -> + try list_to_integer(config:get("osq", Name)) of + C when C > 0-> + C; + _ -> + Default + catch _:_ -> + Default + end. diff --git a/src/ioq_server.erl b/src/ioq_server.erl new file mode 100644 index 0000000..8e0891d --- /dev/null +++ b/src/ioq_server.erl @@ -0,0 +1,608 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_server). +-behaviour(gen_server). +-vsn(1). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0, call/3]). +-export([add_channel/3, rem_channel/2, list_custom_channels/0]). + +-record(channel, { + name, + qI = queue:new(), % client readers + qU = queue:new(), % db updater + qV = queue:new() % view index updates +}). + +-record(state, { + counters, + histos, + reqs = [], + concurrency = 20, + channels = queue:new(), + qC = queue:new(), % compaction + qR = queue:new(), % internal replication + qL = queue:new(), + dedupe, + class_priorities, + op_priorities +}). + +-record(request, { + fd, + msg, + class, + channel, % the name of the channel, not the actual data structure + from, + ref, + t0, + tsub +}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% WARNING %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This server relies on the internal structure of the channels queue as a %% +%% {list(), list()} to do in-place modifications of some elements. We are %% +%% "running on thin ice", as it were. %% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +call(Fd, Msg, Priority) -> + {Class, Channel} = analyze_priority(Priority), + Request = #request{ + fd = Fd, + msg = Msg, + channel = Channel, + class = Class, + t0 = now() + }, + case config:get("ioq.bypass", atom_to_list(Class)) of + "true" -> + RW = rw(Msg), + catch couch_stats:increment_counter([couchdb, io_queue_bypassed, Class]), + catch couch_stats:increment_counter([couchdb, io_queue_bypassed, RW]), + gen_server:call(Fd, Msg, infinity); + _ -> + gen_server:call(?MODULE, Request, infinity) + end. + +add_channel(Account, DbName, ChannelName) -> + ok = ioq_kv:put({Account, DbName}, ChannelName). + +rem_channel(Account, DbName) -> + ok = ioq_kv:delete({Account, DbName}). + +list_custom_channels() -> + ioq_kv:all(). + +init([]) -> + State = #state{ + counters = ets:new(ioq_counters, []), + histos = ets:new(ioq_histos, [named_table, ordered_set]) + }, + erlang:send_after(get_interval(), self(), dump_table), + {ok, update_config(State)}. + +handle_call({set_priority, Pri}, _From, State) -> + {reply, process_flag(priority, Pri), State, 0}; + +handle_call({set_concurrency, C}, _From, State) when is_integer(C), C > 0 -> + {reply, State#state.concurrency, State#state{concurrency = C}, 0}; + +handle_call(get_concurrency, _From, State) -> + {reply, State#state.concurrency, State, 0}; + +handle_call(get_counters, _From, #state{counters = Tab} = State) -> + {reply, Tab, State, 0}; + +handle_call(get_queue_depths, _From, State) -> + Channels = lists:map(fun(#channel{name=N, qI=I, qU=U, qV=V}) -> + {N, [queue:len(I), queue:len(U), queue:len(V)]} + end, queue:to_list(State#state.channels)), + Response = [ + {compaction, queue:len(State#state.qC)}, + {replication, queue:len(State#state.qR)}, + {low, queue:len(State#state.qL)}, + {channels, {Channels}} + ], + {reply, Response, State, 0}; + +handle_call(reset_histos, _From, State) -> + ets:delete_all_objects(State#state.histos), + {reply, ok, State, 0}; + +handle_call(#request{} = Req, From, State) -> + {noreply, enqueue_request(Req#request{from = From}, State), 0}; + +%% backwards-compatible mode for messages sent during hot upgrade +handle_call({Fd, Msg, Priority, T0}, From, State) -> + {Class, Chan} = analyze_priority(Priority), + R = #request{fd=Fd, msg=Msg, channel=Chan, class=Class, t0=T0, from=From}, + {noreply, enqueue_request(R, State), 0}; + +handle_call(_Msg, _From, State) -> + {reply, ignored, State, 0}. + +handle_cast(update_config, State) -> + {noreply, update_config(State), 0}; + +handle_cast(_Msg, State) -> + {noreply, State, 0}. + +handle_info({Ref, Reply}, #state{reqs = Reqs} = State) -> + case lists:keytake(Ref, #request.ref, Reqs) of + {value, #request{from=From} = Req, Reqs2} -> + TResponse = erlang:now(), + erlang:demonitor(Ref, [flush]), + reply_to_all(From, Reply), + update_histograms(ioq_histos, Req, TResponse), + {noreply, State#state{reqs = Reqs2}, 0}; + false -> + {noreply, State, 0} + end; + +handle_info({'DOWN', Ref, _, _, Reason}, #state{reqs = Reqs} = State) -> + case lists:keytake(Ref, #request.ref, Reqs) of + {value, #request{from=From}, Reqs2} -> + reply_to_all(From, {'EXIT', Reason}), + {noreply, State#state{reqs = Reqs2}, 0}; + false -> + {noreply, State, 0} + end; + +handle_info(dump_table, State) -> + erlang:send_after(get_interval(), self(), dump_table), + {noreply, dump_table(State), 0}; + +handle_info(timeout, State) -> + {noreply, maybe_submit_request(State)}; + +handle_info(_Info, State) -> + {noreply, State, 0}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, #state{}=State, _Extra) -> + {ok, State}. + +update_config(State) -> + Concurrency = try + list_to_integer(config:get("ioq", "concurrency", "20")) + catch _:_ -> + 20 + end, + + DeDupe = config:get("ioq", "dedupe", "true") == "true", + + P1 = to_float(catch config:get("ioq", "customer", "1.0")), + P2 = to_float(catch config:get("ioq", "replication", "0.001")), + P3 = to_float(catch config:get("ioq", "compaction", "0.0001")), + P4 = to_float(catch config:get("ioq", "low", "0.0001")), + + P5 = to_float(catch config:get("ioq", "reads", "1.0")), + P6 = to_float(catch config:get("ioq", "writes", "1.0")), + P7 = to_float(catch config:get("ioq", "views", "1.0")), + + State#state{ + concurrency = Concurrency, + dedupe = DeDupe, + class_priorities = [P1, P2, P3, P4], + op_priorities = [P5, P6, P7] + }. + +reply_to_all([], _Reply) -> + ok; +reply_to_all([From|Rest], Reply) -> + gen_server:reply(From, Reply), + reply_to_all(Rest, Reply); +reply_to_all(From, Reply) -> + gen_server:reply(From, Reply). + +analyze_priority({interactive, Shard}) -> + {interactive, channel_name(Shard)}; +analyze_priority({db_update, Shard}) -> + {db_update, channel_name(Shard)}; +analyze_priority({view_update, Shard, _GroupId}) -> + {view_update, channel_name(Shard)}; +analyze_priority({db_compact, _Shard}) -> + {db_compact, nil}; +analyze_priority({view_compact, _Shard, _GroupId}) -> + {view_compact, nil}; +analyze_priority({internal_repl, _Shard}) -> + {internal_repl, nil}; +analyze_priority({low, _Shard}) -> + {low, nil}; +analyze_priority(_Else) -> + {other, other}. + +channel_name(Shard) -> + try split(Shard) of + [<<"shards">>, _, <<"heroku">>, AppId | _] -> + <>; + [<<"shards">>, _, DbName] -> + ioq_kv:get({other, DbName}, other); + [<<"shards">>, _, Account, DbName] -> + ioq_kv:get({Account, DbName}, Account); + [<<"shards">>, _, Account | DbParts] -> + ioq_kv:get({Account, filename:join(DbParts)}, Account); + _ -> + other + catch _:_ -> + other + end. + +enqueue_request(#request{class = db_compact} = Req, State) -> + State#state{qC = update_queue(Req, State#state.qC, State#state.dedupe)}; +enqueue_request(#request{class = view_compact} = Req, State) -> + State#state{qC = update_queue(Req, State#state.qC, State#state.dedupe)}; +enqueue_request(#request{class = internal_repl} = Req, State) -> + State#state{qR = update_queue(Req, State#state.qR, State#state.dedupe)}; +enqueue_request(#request{class = low} = Req, State) -> + State#state{qL = update_queue(Req, State#state.qL, State#state.dedupe)}; +enqueue_request(Req, State) -> + enqueue_channel(Req, State). + +find_channel(Account, {A, B}) -> + case lists:keyfind(Account, #channel.name, A) of + false -> + case lists:keyfind(Account, #channel.name, B) of + false -> + {new, #channel{name = Account}}; + #channel{} = Channel -> + {2, Channel} + end; + #channel{} = Channel -> + {1, Channel} + end. + +update_channel(Ch, #request{class = view_update} = Req, Dedupe) -> + Ch#channel{qV = update_queue(Req, Ch#channel.qV, Dedupe)}; +update_channel(Ch, #request{class = db_update} = Req, Dedupe) -> + Ch#channel{qU = update_queue(Req, Ch#channel.qU, Dedupe)}; +update_channel(Ch, Req, Dedupe) -> + % everything else is interactive IO class + Ch#channel{qI = update_queue(Req, Ch#channel.qI, Dedupe)}. + +update_queue(#request{from=From, fd=Fd, msg={pread_iolist, Pos}}=Req, Q, DD) -> + case maybe_dedupe(Fd, Pos, Q, DD) of + false -> + queue:in(Req, Q); + {Elem, N, #request{from=From1}=Match} -> + catch couch_stats:increment_counter([couchdb, io_queue, merged]), + Match1 = Match#request{from=append(From, From1)}, + L = element(Elem, Q), + {H, [Match|T]} = lists:split(N, L), + setelement(Elem, Q, H ++ [Match1|T]) + end; +update_queue(Req, Q, _Dedupe) -> + queue:in(Req, Q). + +append(A, B) when is_list(B) -> + [A|B]; +append(A, B) -> + [A, B]. + +maybe_dedupe(Fd, Pos, Q, Dedupe) -> + case Dedupe of + true -> matching_request(Fd, Pos, Q); + false -> false + end. + +matching_request(Fd, Pos, {A, B}) -> + case matching_request(Fd, Pos, A) of + false -> + case matching_request(Fd, Pos, B) of + false -> + false; + {N, Request} -> + {2, N, Request} + end; + {N, Request} -> + {1, N, Request} + end; +matching_request(Fd, Pos, List) -> + matching_request(Fd, Pos, 0, List). + +matching_request(_Fd, _Pos, _N, []) -> + false; +matching_request(Fd, Pos, N, [#request{fd=Fd, msg={pread_iolist, Pos}}=Req|_]) -> + {N, Req}; +matching_request(Fd, Pos, N, [_|Rest]) -> + matching_request(Fd, Pos, N + 1, Rest). + +enqueue_channel(#request{channel=Account} = Req, #state{channels=Q} = State) -> + DD = State#state.dedupe, + case find_channel(Account, Q) of + {new, Channel0} -> + State#state{channels = queue:in(update_channel(Channel0, Req, DD), Q)}; + {Elem, Channel0} -> + Channel = update_channel(Channel0, Req, DD), + % the channel already exists in the queue - update it in place + L = element(Elem, Q), + NewL = lists:keyreplace(Account, #channel.name, L, Channel), + NewQ = setelement(Elem, Q, NewL), + State#state{channels = NewQ} + end. + +maybe_submit_request(#state{concurrency=C,reqs=R} = St) when length(R) < C -> + case make_next_request(St) of + St -> + St; + NewState when length(R) >= C-1 -> + NewState; + NewState -> + maybe_submit_request(NewState) + end; +maybe_submit_request(State) -> + State. + +sort_queues(QueuesAndPriorities, Normalization, Choice) -> + sort_queues(QueuesAndPriorities, Normalization, Choice, 0, [], []). + +sort_queues([{Q, _Pri}], _Norm, _Choice, _X, [], Acc) -> + lists:reverse([Q | Acc]); +sort_queues([{Q, Pri}], Norm, Choice, _X, Skipped, Acc) -> + sort_queues(lists:reverse(Skipped), Norm - Pri, Choice, 0, [], [Q | Acc]); +sort_queues([{Q, Pri} | Rest], Norm, Choice, X, Skipped, Acc) -> + if Choice < ((X + Pri) / Norm) -> + Remaining = lists:reverse(Skipped, Rest), + sort_queues(Remaining, Norm - Pri, Choice, 0, [], [Q | Acc]); + true -> + sort_queues(Rest, Norm, Choice, X + Pri, [{Q, Pri} | Skipped], Acc) + end. + +make_next_request(State) -> + #state{ + channels = Ch, + qC = QC, + qR = QR, + qL = QL, + class_priorities = ClassP, + op_priorities = OpP + } = State, + + + {Item, [NewCh, NewQR, NewQC, NewQL]} = + choose_next_request([Ch, QR, QC, QL], ClassP), + + case Item of nil -> + State; + #channel{qI = QI, qU = QU, qV = QV} = Channel -> + % An IO channel has at least one interactive or view indexing request. + % If the channel has more than one request, we'll toss it back into the + % queue after we've extracted one here + {Item2, [QI2, QU2, QV2]} = + choose_next_request([QI, QU, QV], OpP), + case queue:is_empty(QU2) andalso + queue:is_empty(QI2) andalso + queue:is_empty(QV2) of + true -> + NewCh2 = NewCh; + false -> + NewCh2 = queue:in(Channel#channel{qI=QI2, qU=QU2, qV=QV2}, NewCh) + end, + submit_request(Item2, State#state{channels=NewCh2, qC=NewQC, + qR=NewQR, qL=NewQL}); + _ -> + % Item is a background (compaction or internal replication) task + submit_request(Item, State#state{channels=NewCh, qC=NewQC, qR=NewQR, + qL=NewQL}) + end. + +submit_request(Request, State) -> + #request{ + channel = Channel, + fd = Fd, + msg = Call, + t0 = T0, + class = IOClass + } = Request, + #state{reqs = Reqs, counters = Counters} = State, + + % make the request + Ref = erlang:monitor(process, Fd), + Fd ! {'$gen_call', {self(), Ref}, Call}, + + % record some stats + RW = rw(Call), + SubmitTime = now(), + Latency = timer:now_diff(SubmitTime, T0) / 1000, + catch couch_stats:increment_counter([couchdb, io_queue, IOClass]), + catch couch_stats:increment_counter([couchdb, io_queue, RW]), + catch couch_stats:update_histogram([couchdb, io_queue, latency], Latency), + update_counter(Counters, Channel, IOClass, RW), + State#state{reqs = [Request#request{tsub=SubmitTime, ref=Ref} | Reqs]}. + +update_counter(Tab, Channel, IOClass, RW) -> + upsert(Tab, {Channel, IOClass, RW}, 1). + +update_histograms(Tab, Req, TResponse) -> + #request{t0=T0, tsub=TSubmit, class=Class, channel=Channel, msg=Msg} = Req, + Delta1 = timer:now_diff(TSubmit, T0), + Delta2 = timer:now_diff(TResponse, TSubmit), + Bin1 = timebin(Delta1), + Bin2 = timebin(Delta2), + Bin3 = timebin(Delta1+Delta2), + if Channel =/= nil -> + upsert(Tab, {Channel, submit_delay, Bin1}, 1), + upsert(Tab, {Channel, svctm, Bin2}, 1), + upsert(Tab, {Channel, iowait, Bin3}, 1); + true -> ok end, + Key = make_key(Class, Msg), + upsert(Tab, {Key, submit_delay, Bin1}, 1), + upsert(Tab, {Key, svctm, Bin2}, 1), + upsert(Tab, {Key, iowait, Bin3}, 1). + +make_key(db_compact, _) -> + <<"compaction">>; +make_key(view_compact, _) -> + <<"compaction">>; +make_key(internal_repl, _) -> + <<"replication">>; +make_key(low, _) -> + <<"low">>; +make_key(view_update, _) -> + <<"views">>; +make_key(db_update, _) -> + <<"writes">>; +make_key(interactive, {pread_iolist, _}) -> + <<"reads">>; +make_key(interactive, {append_bin, _}) -> + <<"writes">>; +make_key(_, _) -> + <<"other">>. + +upsert(Tab, Key, Incr) -> + try ets:update_counter(Tab, Key, Incr) + catch error:badarg -> + ets:insert(Tab, {Key, Incr}) + end. + +timebin(V) -> + trunc(10*math:log10(V)). + +choose_next_request(Qs, Priorities) -> + Norm = lists:sum(Priorities), + QueuesAndPriorities = lists:zip(Qs, Priorities), + SortedQueues = sort_queues(QueuesAndPriorities, Norm, random:uniform()), + {Item, NewQueues} = choose_prioritized_request(SortedQueues), + Map0 = lists:zip(SortedQueues, NewQueues), + {Item, [element(2, lists:keyfind(Q, 1, Map0)) || Q <- Qs]}. + +choose_prioritized_request(Qs) -> + choose_prioritized_request(Qs, []). + +choose_prioritized_request([], Empties) -> + {nil, lists:reverse(Empties)}; +choose_prioritized_request([Q | Rest], Empties) -> + case queue:out(Q) of + {empty, _} -> + choose_prioritized_request(Rest, [Q | Empties]); + {{value, Item}, NewQ} -> + {Item, lists:reverse([NewQ | Empties], Rest)} + end. + +to_float("0") -> + 0.00001; +to_float("1") -> + 1.0; +to_float(String) when is_list(String) -> + try list_to_float(String) catch error:badarg -> 0.5 end; +to_float(_) -> + 0.5. + +dump_table(#state{counters = Tab} = State) -> + Pid = spawn(fun save_to_db/0), + ets:give_away(Tab, Pid, nil), + State#state{counters = ets:new(ioq_counters, [])}. + +save_to_db() -> + Timeout = get_interval(), + receive {'ETS-TRANSFER', Tab, _, _} -> + Dict = ets:foldl(fun table_fold/2, dict:new(), Tab), + TS = list_to_binary(iso8601_timestamp()), + Doc = {[ + {<<"_id">>, TS}, + {type, ioq}, + {node, node()}, + {accounts, {dict:to_list(Dict)}} + ]}, + try + fabric:update_doc(get_stats_dbname(), Doc, []) + catch error:database_does_not_exist -> + couch_log:error("Missing IOQ stats db: ~s", [get_stats_dbname()]) + end + after Timeout -> + error_logger:error_report({?MODULE, "ets transfer failed"}) + end. + +table_fold({{other, _, _}, _}, D) -> + D; +table_fold({{Channel, interactive, reads}, X}, D) -> + dict:update(Channel, fun([A,B,C]) -> [A+X,B,C] end, [X,0,0], D); +table_fold({{Channel, interactive, writes}, X}, D) -> + dict:update(Channel, fun([A,B,C]) -> [A,B+X,C] end, [0,X,0], D); +table_fold({{Channel, db_update, reads}, X}, D) -> + dict:update(Channel, fun([A,B,C]) -> [A,B+X,C] end, [0,X,0], D); +table_fold({{Channel, db_update, writes}, X}, D) -> + dict:update(Channel, fun([A,B,C]) -> [A,B+X,C] end, [0,X,0], D); +table_fold({{Channel, view_update, reads}, X}, D) -> + dict:update(Channel, fun([A,B,C]) -> [A,B,C+X] end, [0,0,X], D); +table_fold({{Channel, view_update, writes}, X}, D) -> + dict:update(Channel, fun([A,B,C]) -> [A,B,C+X] end, [0,0,X], D); +table_fold(_, D) -> + D. + +iso8601_timestamp() -> + {_,_,Micro} = Now = os:timestamp(), + {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now), + Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ", + io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]). + +get_interval() -> + case application:get_env(ioq, stats_interval) of + {ok, Interval} when is_integer(Interval) -> + Interval; + _ -> + 60000 + end. + +get_stats_dbname() -> + case application:get_env(ioq, stats_db) of + {ok, DbName} when is_list(DbName) -> + DbName; + _ -> + "stats" + end. + +split(B) when is_binary(B) -> + split(B, 0, 0, []); +split(B) -> B. + +split(B, O, S, Acc) -> + case B of + <<_:O/binary>> -> + Len = O - S, + <<_:S/binary, Part:Len/binary>> = B, + lists:reverse(Acc, [Part]); + <<_:O/binary, $/, _/binary>> -> + Len = O - S, + <<_:S/binary, Part:Len/binary, _/binary>> = B, + split(B, O+1, O+1, [Part | Acc]); + _ -> + split(B, O+1, S, Acc) + end. + +rw({pread_iolist, _}) -> + reads; +rw({append_bin, _}) -> + writes; +rw(_) -> + unknown. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +sort_queues_test() -> + ?assertEqual([a, b, c], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.10)), + ?assertEqual([a, c, b], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.45)), + ?assertEqual([b, a, c], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.60)), + ?assertEqual([b, c, a], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.65)), + ?assertEqual([c, a, b], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.71)), + ?assertEqual([c, b, a], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.90)). + +-endif. diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl new file mode 100644 index 0000000..5e2e01f --- /dev/null +++ b/src/ioq_server2.erl @@ -0,0 +1,1068 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_server2). +-behavior(gen_server). + + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). +-export([ + start_link/3, + call/3, + pcall/1, + pcall/2 +]). +-export([ + get_state/0, + get_state/1, + update_config/0, + get_queue_depths/0, + get_queue_depths/1, + get_concurrency/0, + set_concurrency/1, + get_counters/0 +]). + + +-include_lib("ioq/include/ioq.hrl"). + + +-define(DEFAULT_RESIZE_LIMIT, 1000). +-define(DEFAULT_CONCURRENCY, 1). +-define(DEFAULT_SCALE_FACTOR, 2.0). +-define(DEFAULT_MAX_PRIORITY, 10000.0). + + +-record(state, { + reqs :: khash:khash(), + waiters :: khash:khash(), + queue :: hqueue:hqueue(), + concurrency = ?DEFAULT_CONCURRENCY :: pos_integer(), + iterations = 0 :: non_neg_integer(), + class_p :: khash:khash(), %% class priorities + user_p :: khash:khash(), %% user priorities + shard_p :: khash:khash(), %% shard priorities + scale_factor = ?DEFAULT_SCALE_FACTOR :: float(), + dedupe = true :: boolean(), + resize_limit = ?DEFAULT_RESIZE_LIMIT :: pos_integer(), + next_key = 1 :: pos_integer(), + server_name :: atom(), + scheduler_id = 0 :: non_neg_integer(), + max_priority = ?DEFAULT_MAX_PRIORITY :: float() +}). + + +-type state() :: #state{}. +-type waiter_key() :: {pid(), integer()} | pos_integer(). +-type priority() :: float(). %% should be non_negative_float(). + +%% Hacky queue_depth type due to existing fixed element lists for JSON in API +%% Actual type is: +%% [ +%% {compaction, non_neg_integer()}, +%% {replication, non_neg_integer()}, +%% {low, non_neg_integer()}, +%% {channels, {[{username(), [Interactive, DbUpdate, ViewUpdate]}]}} +%% ] +%% when Interactive = DbUpdate = ViewUpdate = non_neg_integer(). +-type depths() :: compaction | replication | low. +-type user_depth() :: {binary(), [non_neg_integer()]}. +-type depth_ele() :: {depths(), non_neg_integer()} | user_depth(). +-type queue_depths() :: [depth_ele()]. +-type read_write() :: reads | writes | unknown. + + +-define(SERVER_ID(SID), list_to_atom("ioq_server_" ++ integer_to_list(SID))). + + +-spec call(pid(), term(), io_dimensions()) -> term(). +call(Fd, Msg, Dimensions) -> + Req0 = #ioq_request{ + fd = Fd, + msg = Msg, + t0 = os:timestamp() + }, + Req = add_request_dimensions(Req0, Dimensions), + Class = atom_to_list(Req#ioq_request.class), + case config:get_boolean("ioq2.bypass", Class, false) of + true -> + RW = rw(Msg), + couch_stats:increment_counter([couchdb, io_queue2, bypassed_count]), + couch_stats:increment_counter( + [couchdb, io_queue2, RW, bypassed_count]), + gen_server:call(Fd, Msg, infinity); + _ -> + DispatchStrategy = config:get( + "ioq2", "dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER), + Server = case DispatchStrategy of + ?DISPATCH_RANDOM -> + maybe_seed(), + SID = random:uniform(erlang:system_info(schedulers)), + ?SERVER_ID(SID); + ?DISPATCH_FD_HASH -> + NumSchedulers = erlang:system_info(schedulers), + SID = 1 + (erlang:phash2(Fd) rem NumSchedulers), + ?SERVER_ID(SID); + ?DISPATCH_SINGLE_SERVER -> + ?SERVER_ID(1); + _ -> + SID = erlang:system_info(scheduler_id), + ?SERVER_ID(SID) + end, + gen_server:call(Server, Req, infinity) + end. + + +-spec pcall(any()) -> any(). +pcall(Msg) -> + pcall(Msg, 500). + + +-spec pcall(any(), non_neg_integer()) -> any(). +pcall(Msg, Timeout) -> + {MainPid, MainRef} = spawn_monitor(fun() -> + PidRefs = lists:map(fun(Name) -> + spawn_monitor(fun() -> + Resp = gen_server:call(Name, Msg, Timeout), + exit({resp_ok, Resp}) + end) + end, ioq_sup:get_ioq2_servers()), + Resps = lists:map(fun({Pid, _}) -> + receive + {'DOWN', _, _, Pid, {resp_ok, Resp}} -> + Resp; + {'DOWN', _, _, Pid, Error} -> + exit(Error) + end + end, PidRefs), + exit({resp_ok, Resps}) + end), + receive + {'DOWN', _, _, MainPid, {resp_ok, Resps}} -> + {ok, Resps}; + {'DOWN', _, _, MainPid, Error} -> + {error, Error} + after Timeout -> + erlang:demonitor(MainRef, [flush]), + exit(MainPid, kill), + {error, timeout} + end. + + +-spec get_queue_depths() -> queue_depths(). +get_queue_depths() -> + case pcall(get_pending_reqs, 500) of + {ok, PReqs} -> + get_queue_depths([Req || {_Priority, Req} <- lists:flatten(PReqs)]); + {error, _} -> + [ + {compaction, ?BAD_MAGIC_NUM}, + {replication, ?BAD_MAGIC_NUM}, + {low, ?BAD_MAGIC_NUM}, + {channels, {[]}} + ] + end. + + +-spec get_queue_depths([ioq_request()]) -> queue_depths(). +get_queue_depths(Reqs) -> + {ok, Users0} = khash:new(), + {Compaction, Replication, Low, Users} = lists:foldl( + fun + (#ioq_request{class=db_compact}, {C, R, L, U}) -> + {C+1, R, L, U}; + (#ioq_request{class=view_compact}, {C, R, L, U}) -> + {C+1, R, L, U}; + (#ioq_request{class=internal_repl}, {C, R, L, U}) -> + {C, R+1, L, U}; + (#ioq_request{class=low}, {C, R, L, U}) -> + {C, R, L+1, U}; + (#ioq_request{class=Class, user=User}, {C, R, L, U}) -> + [UI0, UDB0, UV0] = case khash:get(U, User) of + undefined -> + [0,0,0]; + UC0 -> + UC0 + end, + UC = case Class of + db_update -> + [UI0, UDB0+1, UV0]; + view_update -> + [UI0, UDB0, UV0+1]; + _Interactive -> + [UI0+1, UDB0, UV0] + end, + ok = khash:put(U, User, UC), + {C, R, L, U} + end, + {0, 0, 0, Users0}, + Reqs + ), + [ + {compaction, Compaction}, + {replication, Replication}, + {low, Low}, + {channels, {khash:to_list(Users)}} + ]. + + +-spec get_concurrency() -> non_neg_integer(). +get_concurrency() -> + case pcall(get_concurrency, 500) of + {ok, Concurrencies} -> + lists:sum(Concurrencies); + {error, _} -> + ?BAD_MAGIC_NUM + end. + + +-spec set_concurrency(non_neg_integer()) -> non_neg_integer(). +set_concurrency(C) when is_integer(C), C > 0 -> + lists:foldl( + fun(Pid, Total) -> + {ok, Old} = gen_server:call(Pid, {set_concurrency, C}, 1000), + Total + Old + end, + 0, + ioq_sup:get_ioq2_servers() + ); +set_concurrency(_) -> + erlang:error(badarg). + + +get_counters() -> + undefined. + + +%% @equiv get_state(?SERVER_ID(1)) +-spec get_state() -> any(). +get_state() -> + get_state(?SERVER_ID(1)). + + +%% Returns a mutated #state{} with list representations of khash/hqueue objects +-spec get_state(atom()) -> any(). +get_state(Server) -> + gen_server:call(Server, get_state, infinity). + + +-spec update_config() -> ok. +update_config() -> + gen_server:call(?SERVER_ID(1), update_config, infinity). + + +start_link(Name, SID, Bind) -> + Options = case Bind of + true -> [{scheduler, SID}]; + false -> [] + end, + gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options). + + +init([Name, SID]) -> + {ok, HQ} = hqueue:new(), + {ok, Reqs} = khash:new(), + {ok, Waiters} = khash:new(), + State = #state{ + queue = HQ, + reqs = Reqs, + waiters = Waiters, + server_name = Name, + scheduler_id = SID + }, + {ok, update_config_int(State)}. + + +handle_call(get_state, _From, State) -> + Resp = State#state{ + user_p = khash:to_list(State#state.user_p), + class_p = khash:to_list(State#state.class_p), + shard_p = khash:to_list(State#state.shard_p), + reqs = khash:to_list(State#state.reqs), + waiters = khash:to_list(State#state.waiters), + queue = hqueue:to_list(State#state.queue) + }, + + {reply, Resp, State, 0}; +handle_call(#ioq_request{} = Req, From, State) -> + {noreply, enqueue_request(Req#ioq_request{from=From}, State), 0}; +handle_call({hqueue, Method, Args}, _From, #state{queue=HQ}=State) -> + Resp = erlang:apply(hqueue, Method, [HQ | Args]), + {reply, Resp, State, 0}; +handle_call(update_config, _From, State) -> + {reply, ok, update_config_int(State), 0}; +handle_call(get_concurrency, _From, State) -> + {reply, State#state.concurrency, State, 0}; +handle_call({set_concurrency, C}, _From, State) when is_integer(C), C > 0 -> + {reply, {ok, State#state.concurrency}, State#state{concurrency = C}, 0}; +handle_call(get_reqs, _From, #state{reqs=Reqs}=State) -> + {reply, khash:to_list(Reqs), State, 0}; +handle_call(get_pending_reqs, _From, #state{queue=HQ}=State) -> + {reply, hqueue:to_list(HQ), State, 0}; +handle_call(get_counters, _From, State) -> + {reply, undefined, State, 0}; +handle_call(_, _From, State) -> + {reply, ok, State, 0}. + + +handle_cast(update_config, State) -> + {noreply, update_config_int(State), 0}; +handle_cast(_Msg, State) -> + {noreply, State, 0}. + + +handle_info({Ref, Reply}, #state{reqs = Reqs} = State) -> + case khash:get(Reqs, Ref) of + undefined -> + ok; + #ioq_request{ref=Ref}=Req -> + ok = khash:del(Reqs, Ref), + TResponse = os:timestamp(), + ServiceTime = time_delta(TResponse, Req#ioq_request.tsub), + IOWait = time_delta(TResponse, Req#ioq_request.t0), + couch_stats:update_histogram( + [couchdb, io_queue2, svctm], ServiceTime), + couch_stats:update_histogram([couchdb, io_queue2, iowait], IOWait), + erlang:demonitor(Ref, [flush]), + send_response(State#state.waiters, Req, Reply) + end, + {noreply, State, 0}; +handle_info({'DOWN', Ref, _, _, Reason}, #state{reqs = Reqs} = State) -> + case khash:get(Reqs, Ref) of + undefined -> + ok; + #ioq_request{ref=Ref}=Req -> + couch_stats:increment_counter([couchdb, io_queue2, io_errors]), + ok = khash:del(Reqs, Ref), + send_response(State#state.waiters, Req, {'EXIT', Reason}) + end, + {noreply, State, 0}; +handle_info(timeout, State) -> + {noreply, maybe_submit_request(State)}; +handle_info(_Info, State) -> + {noreply, State, 0}. + + +terminate(_Reason, _State) -> + ok. + + +code_change(_OldVsn, #state{}=State, _Extra) -> + {ok, State}. + + +-spec update_config_int(state()) -> state(). +update_config_int(State) -> + Concurrency = config:get_integer("ioq2", "concurrency", ?DEFAULT_CONCURRENCY), + ResizeLimit = config:get_integer("ioq2", "resize_limit", ?DEFAULT_RESIZE_LIMIT), + DeDupe = config:get_boolean("ioq2", "dedupe", true), + + ScaleFactor = ioq_config:to_float( + config:get("ioq2", "scale_factor"), + ?DEFAULT_SCALE_FACTOR + ), + + MaxPriority = ioq_config:to_float( + config:get("ioq2", "max_priority"), + ?DEFAULT_MAX_PRIORITY + ), + + {ok, ClassP} = ioq_config:build_class_priorities(), + {ok, UserP} = ioq_config:build_user_priorities(), + {ok, ShardP} = ioq_config:build_shard_priorities(), + + State#state{ + user_p = UserP, + class_p = ClassP, + shard_p = ShardP, + scale_factor = ScaleFactor, + concurrency = Concurrency, + dedupe = DeDupe, + resize_limit = ResizeLimit, + max_priority = MaxPriority + }. + + +-spec maybe_submit_request(state()) -> state(). +maybe_submit_request(#state{reqs=Reqs, concurrency=C}=State) -> + NumReqs = khash:size(Reqs), + case NumReqs < C of + true -> + case make_next_request(State) of + State -> + State; + NewState when NumReqs >= C-1 -> + NewState; + NewState -> + maybe_submit_request(NewState) + end; + false -> + State + end. + + +-spec make_next_request(state()) -> state(). +make_next_request(#state{queue=HQ}=State) -> + case hqueue:extract_max(HQ) of + {error, empty} -> + State; + {Priority, #ioq_request{} = Req} -> + submit_request(Req#ioq_request{fin_priority=Priority}, State) + end. + + +-spec submit_request(ioq_request(), state()) -> state(). +submit_request(Req, #state{iterations=I, resize_limit=RL}=State) when I >= RL -> + ok = hqueue:scale_by(State#state.queue, State#state.scale_factor), + submit_request(Req, State#state{iterations=0}); +submit_request(Req, #state{iterations=Iterations}=State) -> + #ioq_request{ + fd = Fd, + msg = Call, + class = Class, + t0 = T0 + } = Req, + #state{reqs = Reqs} = State, + + % make the request + Ref = erlang:monitor(process, Fd), + Fd ! {'$gen_call', {self(), Ref}, Call}, + + % record some stats + RW = rw(Call), + + SubmitTime = os:timestamp(), + Latency = time_delta(SubmitTime, T0), + couch_stats:increment_counter([couchdb, io_queue2, Class, count]), + couch_stats:increment_counter([couchdb, io_queue2, RW, count]), + couch_stats:update_histogram([couchdb, io_queue2, submit_delay], Latency), + khash:put(Reqs, Ref, Req#ioq_request{tsub=SubmitTime, ref=Ref}), + State#state{iterations=Iterations+1}. + + +-spec send_response(khash:khash(), ioq_request(), term()) -> [ok]. +send_response(Waiters, #ioq_request{key=Key}, Reply) -> + Waiting = khash:get(Waiters, Key), + khash:del(Waiters, Key), + [gen_server:reply(W, Reply) || W <- Waiting]. + + +-spec waiter_key(ioq_request(), state()) -> {waiter_key(), state()}. +waiter_key(Req, State) -> + case {State#state.dedupe, Req#ioq_request.msg} of + {true, {pread_iolist, Pos}} -> + {{Req#ioq_request.fd, Pos}, State}; + _ -> + Next = State#state.next_key, + {Next, State#state{next_key = Next + 1}} + end. + + +-spec enqueue_request(ioq_request(), state()) -> state(). +enqueue_request(Req, #state{queue=HQ, waiters=Waiters}=State0) -> + #ioq_request{ + from = From, + msg = Msg + } = Req, + {ReqKey, State} = waiter_key(Req, State0), + RW = rw(Msg), + + couch_stats:increment_counter([couchdb, io_queue2, queued]), + couch_stats:increment_counter([couchdb, io_queue2, RW, queued]), + + case khash:get(State#state.waiters, ReqKey, not_found) of + not_found -> + Priority = prioritize_request(Req, State), + Req1 = Req#ioq_request{ + key = ReqKey, + init_priority = Priority + }, + hqueue:insert(HQ, Priority, Req1), + khash:put(State#state.waiters, ReqKey, [From]); + Pids -> + couch_stats:increment_counter([couchdb, io_queue2, merged]), + khash:put(Waiters, ReqKey, [From | Pids]) + end, + State. + + +-spec add_request_dimensions(ioq_request(), io_dimensions()) -> ioq_request(). +add_request_dimensions(Request, {Class, Shard}) -> + add_request_dimensions(Request, {Class, Shard, undefined}); +add_request_dimensions(Request, {Class, Shard0, GroupId}) -> + {Shard, User, DbName} = case {Class, Shard0} of + {interactive, "dbcopy"} -> + {undefined, undefined, undefined}; + {db_meta, security} -> + {undefined, undefined, undefined}; + _ -> + Shard1 = filename:rootname(Shard0), + {User0, DbName0} = shard_info(Shard1), + {Shard1, User0, DbName0} + end, + Request#ioq_request{ + shard = Shard, + user = User, + db = DbName, + ddoc = GroupId, + class = Class + }; +add_request_dimensions(Request, undefined) -> + Request#ioq_request{class=other}. + + +-spec shard_info(dbname()) -> {any(), any()}. +shard_info(Shard) -> + try split(Shard) of + [<<"shards">>, _, <<"heroku">>, AppId, DbName] -> + {<>, DbName}; + [<<"shards">>, _, DbName] -> + {system, DbName}; + [<<"shards">>, _, Account, DbName] -> + {Account, DbName}; + [<<"shards">>, _, Account | DbParts] -> + {Account, filename:join(DbParts)}; + _ -> + {undefined, undefined} + catch _:_ -> + {undefined, undefined} + end. + + +-spec split(binary()) -> [binary()] + ; ([binary()]) -> [binary()]. +split(B) when is_binary(B) -> + split(B, 0, 0, []); +split(B) -> + B. + +-spec split(binary(), non_neg_integer(), non_neg_integer(), [binary()]) -> [binary()]. +split(B, O, S, Acc) -> + case B of + <<_:O/binary>> -> + Len = O - S, + <<_:S/binary, Part:Len/binary>> = B, + lists:reverse(Acc, [Part]); + <<_:O/binary, $/, _/binary>> -> + Len = O - S, + <<_:S/binary, Part:Len/binary, _/binary>> = B, + split(B, O+1, O+1, [Part | Acc]); + _ -> + split(B, O+1, S, Acc) + end. + +-spec time_delta(T1, T0) -> Tdiff when + T1 :: erlang:timestamp(), + T0 :: erlang:timestamp(), + Tdiff :: integer(). +time_delta(T1, T0) -> + trunc(timer:now_diff(T1, T0) / 1000). + + +-spec rw(io_dimensions()) -> read_write(). +rw({pread_iolist, _}) -> + reads; +rw({append_bin, _}) -> + writes; +rw({append_bin, _, _}) -> + writes; +rw(_) -> + unknown. + + +-spec prioritize_request(ioq_request(), state()) -> priority(). +prioritize_request(Req, State) -> + #state{ + class_p = ClassP, + user_p = UserP, + shard_p = ShardP, + max_priority = Max + } = State, + case ioq_config:prioritize(Req, ClassP, UserP, ShardP) of + Priority when Priority < 0.0 -> 0.0; + Priority when Priority > Max -> Max; + Priority -> Priority + end. + + +-spec maybe_seed() -> {integer(), integer(), integer()}. +maybe_seed() -> + case get(random_seed) of + undefined -> + <> = crypto:strong_rand_bytes(12), + Seed = {A, B, C}, + random:seed(Seed), + Seed; + Seed -> + Seed + end. + + +%% ioq_server2 Tests + + +-ifdef(TEST). + + +-include_lib("eunit/include/eunit.hrl"). + + +mock_server() -> + mock_server([]). + + +mock_server(Config) -> + meck:new(config), + meck:expect(config, get, fun(Group) -> + couch_util:get_value(Group, Config, []) + end), + meck:expect(config, get, fun(_,_) -> + undefined + end), + meck:expect(config, get, fun("ioq2", _, Default) -> + Default + end), + meck:expect(config, get_integer, fun("ioq2", _, Default) -> + Default + end), + meck:expect(config, get_boolean, fun("ioq2", _, Default) -> + Default + end), + {ok, State} = ioq_server2:init([?SERVER_ID(1), 1]), + State. + + +unmock_server(_) -> + true = meck:validate(config), + ok = meck:unload(config). + + +empty_config_test_() -> + { + "Empty config tests", + { + foreach, + fun mock_server/0, + fun unmock_server/1, + [ + fun test_basic_server_config/1, + fun test_simple_request_priority/1, + fun test_simple_dedupe/1, + fun test_io_error/1 + ] + } + }. + + +simple_config_test_() -> + { + "Simple config tests", + { + foreach, + fun() -> + Config = [ + {"ioq2.classes", [{"db_compact", "0.9"}]}, + {"ioq2", [{"resize_limit", "10"}]} + ], + mock_server(Config) + end, + fun unmock_server/1, + [ + fun test_simple_config/1, + fun test_auto_scale/1 + ] + } + }. + + +priority_extremes_test_() -> + { + "Test min/max priorities", + { + foreach, + fun() -> + Config = [ + {"ioq2.classes", [ + {"db_compact", "9999999.0"}, + {"interactive", "-0.00000000000000001"} + ]} + ], + mock_server(Config) + end, + fun unmock_server/1, + [ + fun test_min_max_priorities/1 + ] + } + }. + + +queue_depths_test_() -> + Foo = <<"foo">>, + Bar = <<"bar">>, + Reqs = [ + #ioq_request{user=Foo, class=db_compact}, + #ioq_request{user=Bar, class=db_compact}, + #ioq_request{user=Bar, class=view_compact}, + #ioq_request{user=Foo, class=internal_repl}, + #ioq_request{user=Bar, class=internal_repl}, + #ioq_request{user=Bar, class=internal_repl}, + #ioq_request{class=low}, + + #ioq_request{user=Foo, class=interactive}, + #ioq_request{user=Foo, class=interactive}, + #ioq_request{user=Foo, class=db_update}, + #ioq_request{user=Foo, class=view_update}, + #ioq_request{user=Foo, class=view_update}, + #ioq_request{user=Foo, class=view_update}, + #ioq_request{user=Foo, class=view_update}, + + #ioq_request{user=Bar, class=interactive}, + #ioq_request{user=Bar, class=db_update}, + #ioq_request{user=Bar, class=db_update}, + #ioq_request{user=Bar, class=db_update}, + #ioq_request{user=Bar, class=view_update} + ], + Expected = [ + {compaction, 3}, + {replication, 3}, + {low, 1}, + {channels, {[ + {<<"foo">>, [2,1,4]}, + {<<"bar">>, [1,3,1]} + ]}} + ], + + { + "Test queue depth stats", + ?_assertEqual( + Expected, + get_queue_depths(Reqs) + ) + }. + + +test_basic_server_config(St0) -> + {reply, RespState, _St1, 0} = handle_call(get_state, pid, St0), + [ + ?_assertEqual([], RespState#state.user_p), + ?_assertEqual( + lists:sort(?DEFAULT_CLASS_PRIORITIES), + lists:sort(RespState#state.class_p) + ), + ?_assertEqual([], RespState#state.shard_p) + ]. + + +test_simple_request_priority(St0) -> + From = pid1, + Request0 = #ioq_request{class=db_compact}, + Priority = prioritize_request(Request0, St0), + Request1 = Request0#ioq_request{ + init_priority = Priority, + from = From, + key = St0#state.next_key + }, + {noreply, St1, 0} = handle_call(Request0, From, St0), + {reply, RespState, _St2, 0} = handle_call(get_state, From, St1), + [ + ?_assertEqual( + [{Priority, Request1}], + RespState#state.queue + ) + ]. + + +test_simple_dedupe(St0) -> + FromA = pid1, + FromB = pid2, + Fd = fd, + Pos = 1234, + Msg = {pread_iolist, Pos}, + Request0 = #ioq_request{ + class=db_compact, + fd = Fd, + msg = Msg + }, + {ReqKey, St1} = waiter_key(Request0, St0), + Priority = prioritize_request(Request0, St1), + Request1A = Request0#ioq_request{ + init_priority = Priority, + from = FromA, + key = {Fd, Pos} + }, + _Request1B = Request0#ioq_request{ + init_priority = Priority, + from = FromA, + key = {Fd, Pos} + }, + {noreply, St2, 0} = handle_call(Request0, FromA, St1), + {noreply, St3, 0} = handle_call(Request0, FromB, St2), + {reply, RespState, _St4, 0} = handle_call(get_state, FromA, St3), + [ + ?_assertEqual( + [{Priority, Request1A}], + RespState#state.queue + ), + ?_assertEqual( + [{ReqKey, [FromB, FromA]}], + RespState#state.waiters + ) + ]. + + +test_simple_config(St) -> + RequestA = #ioq_request{}, + RequestB = #ioq_request{class=db_compact}, + PriorityA = prioritize_request(RequestA, St), + PriorityB = prioritize_request(RequestB, St), + + [ + ?_assertEqual( + 1.0, + PriorityA + ), + ?_assertEqual( + 0.9, + PriorityB + ) + ]. + + +test_min_max_priorities(St) -> + RequestA = #ioq_request{class=interactive}, + RequestB = #ioq_request{class=db_compact}, + PriorityA = prioritize_request(RequestA, St), + PriorityB = prioritize_request(RequestB, St), + + [ + ?_assertEqual( + 0.0, + PriorityA + ), + ?_assertEqual( + ?DEFAULT_MAX_PRIORITY, + PriorityB + ) + ]. + + +test_auto_scale(#state{queue=HQ}=St0) -> + %% start with iterations=2 so we can tell when we auto-scaled + St1 = St0#state{resize_limit=10, iterations=2}, + Pid = spawn(fun() -> receive baz -> ok end end), + T0 = os:timestamp(), + BaseReq = #ioq_request{t0=T0, fd=Pid}, + + RequestA = BaseReq#ioq_request{ref=make_ref()}, + RequestB = BaseReq#ioq_request{class=db_compact, ref=make_ref()}, + PriorityA = prioritize_request(RequestA, St1), + PriorityB = prioritize_request(RequestB, St1), + + {noreply, St2, 0} = handle_call(RequestB, Pid, St1), + {noreply, St3, 0} = handle_call(RequestA, Pid, St2), + + {_, #ioq_request{init_priority=PriorityA2}} = hqueue:extract_max(HQ), + Tests0 = [?_assertEqual(PriorityA, PriorityA2)], + {_St, Tests} = lists:foldl( + fun(_N, {#state{iterations=I, resize_limit=RL}=StN0, TestsN}) -> + ReqN = BaseReq#ioq_request{ref=make_ref()}, + ExpectedPriority = case I == 1 of + false -> PriorityA; + true -> PriorityB + end, + {noreply, StN1, 0} = handle_call(ReqN, Pid, StN0), + StN2 = submit_request(ReqN, StN1), + {_, #ioq_request{init_priority=PriorityN}} = hqueue:extract_max(HQ), + { + StN2, + [?_assertEqual(ExpectedPriority, PriorityN) | TestsN] + } + end, + {St3, Tests0}, + lists:seq(1, St3#state.resize_limit + 7) + ), + lists:reverse(Tests). + + +all_test_() -> + {setup, fun setup/0, fun cleanup/1, fun instantiate/1}. + + +many_clients_test_() -> + FDCount = 50, + ClientCount = 10, + MaxDelay = 20, + { + setup, + fun() -> setup_many(FDCount, MaxDelay) end, + fun cleanup/1, + fun(Servers) -> test_many_clients(Servers, ClientCount) end + }. + + +setup() -> + meck:new(config, [passthrough]), + meck:expect(config, get_boolean, + fun + ("ioq2", "enabled", _) -> + true; + ("ioq2", "server_per_scheduler", _) -> + false; + (_, _, Default) -> + Default + end + ), + {ok, _} = application:ensure_all_started(ioq), + FakeServer = fun(F) -> + receive {'$gen_call', {Pid, Ref}, Call} -> + Pid ! {Ref, {reply, Call}} + end, + F(F) + end, + spawn(fun() -> FakeServer(FakeServer) end). + + +setup_many(Count, RespDelay) -> + {ok, _} = application:ensure_all_started(ioq), + meck:new(config, [passthrough]), + meck:expect(config, get_boolean, + fun + ("ioq2", "enabled", _) -> + true; + ("ioq2", "server_per_scheduler", _) -> + false; + (_, _, Default) -> + Default + end + ), + FakeServer = fun(F) -> + receive {'$gen_call', {Pid, Ref}, Call} -> + timer:sleep(random:uniform(RespDelay)), + Pid ! {Ref, {reply, Call}} + end, + F(F) + end, + [spawn(fun() -> FakeServer(FakeServer) end) || _ <- lists:seq(1, Count)]. + + +cleanup(Server) when not is_list(Server) -> + cleanup([Server]); +cleanup(Servers) -> + ok = application:stop(ioq), + true = meck:validate(config), + ok = meck:unload(config), + [exit(Server, kill) || Server <- Servers]. + + +instantiate(S) -> + Old = ?DEFAULT_CONCURRENCY * length(ioq_sup:get_ioq2_servers()), + [{inparallel, lists:map(fun(IOClass) -> + lists:map(fun(Shard) -> + check_call(S, make_ref(), priority(IOClass, Shard)) + end, shards()) + end, io_classes())}, + ?_assertEqual(Old, ioq:set_disk_concurrency(10)), + ?_assertError(badarg, ioq:set_disk_concurrency(0)), + ?_assertError(badarg, ioq:set_disk_concurrency(-1)), + ?_assertError(badarg, ioq:set_disk_concurrency(foo))]. + + +check_call(Server, Call, Priority) -> + ?_assertEqual({reply, Call}, ioq_server2:call(Server, Call, Priority)). + + +io_classes() -> [interactive, view_update, db_compact, view_compact, + internal_repl, other, db_meta]. + + +shards() -> + [ + <<"shards/0-1/heroku/app928427/couchrest.1317609656.couch">>, + <<"shards/0-1/foo">>, + <<"shards/0-3/foo">>, + <<"shards/0-1/bar">>, + <<"shards/0-1/kocolosk/stats.1299297461.couch">>, + <<"shards/0-1/kocolosk/my/db.1299297457.couch">>, + other + ]. + + +priority(view_update, Shard) -> + {view_update, Shard, <<"_design/foo">>}; +priority(Any, Shard) -> + {Any, Shard}. + + +test_many_clients(Servers, ClientCount) -> + ClientFun = fun() -> + ok = lists:foreach(fun(IOClass) -> + ok = lists:foreach(fun(Shard) -> + Server = random_server(Servers), + Ref = make_ref(), + Priority = priority(IOClass, Shard), + {reply, Ref} = ioq_server2:call(Server, Ref, Priority), + ok + end, shards()) + end, io_classes()), + ok + end, + ok = lists:foreach(fun(_) -> spawn_monitor(ClientFun) end, lists:seq(1, ClientCount)), + + Status = wait_for_success(ClientCount), + ?_assert(Status). + + +wait_for_success(0) -> + true; +wait_for_success(Count) when Count > 0 -> + receive + {'DOWN', _Ref, process, _Pid, normal} -> + wait_for_success(Count - 1); + Msg -> + ?debugFmt("UNEXPECTED CLIENT EXIT: ~p~n", [Msg]), + false + end. + + +random_server(Servers) -> + lists:nth(random:uniform(length(Servers)), Servers). + + +test_io_error(#state{waiters=Waiters, reqs=Reqs}=State) -> + Key = asdf, + Ref = make_ref(), + RefTag = make_ref(), + Req = #ioq_request{ref=Ref, key=Key}, + khash:put(Waiters, Key, [{self(), RefTag}]), + khash:put(Reqs, Ref, Req), + Error = {exit, foo}, + {noreply, _State1, 0} = handle_info({'DOWN', Ref, baz, zab, Error}, State), + Resp = receive + {RefTag, {'EXIT', Error}} -> + {ok, Error}; + Else -> + {error, Else} + after 5000 -> + {error, timeout} + end, + ?_assertEqual({ok, Error}, Resp). + + +-endif. diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl index c4d04a9..7ea6284 100644 --- a/src/ioq_sup.erl +++ b/src/ioq_sup.erl @@ -13,6 +13,7 @@ -module(ioq_sup). -behaviour(supervisor). -export([start_link/0, init/1]). +-export([get_ioq2_servers/0]). %% Helper macro for declaring children of supervisor -define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). @@ -21,4 +22,28 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, { {one_for_one, 5, 10}, [?CHILD(ioq, worker)]}}. + ok = ioq_config_listener:subscribe(), + IOQ2Children = ioq_server2_children(), + {ok, { + {one_for_one, 5, 10}, + [ + ?CHILD(ioq_server, worker), + ?CHILD(ioq_osq, worker) + | IOQ2Children + ] + }}. + +ioq_server2_children() -> + Bind = config:get_boolean("ioq2", "bind_to_schedulers", false), + ioq_server2_children(erlang:system_info(schedulers), Bind). + +ioq_server2_children(Count, Bind) -> + lists:map(fun(I) -> + Name = list_to_atom("ioq_server_" ++ integer_to_list(I)), + {Name, {ioq_server2, start_link, [Name, I, Bind]}, permanent, 5000, worker, [Name]} + end, lists:seq(1, Count)). + +get_ioq2_servers() -> + lists:map(fun(I) -> + list_to_atom("ioq_server_" ++ integer_to_list(I)) + end, lists:seq(1, erlang:system_info(schedulers))). diff --git a/test/ioq_config_tests.erl b/test/ioq_config_tests.erl new file mode 100644 index 0000000..d3fee81 --- /dev/null +++ b/test/ioq_config_tests.erl @@ -0,0 +1,157 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_config_tests). + + +-include_lib("ioq/include/ioq.hrl"). +-include_lib("eunit/include/eunit.hrl"). + + +-define(USERS_CONFIG, [{"bar","2.4"},{"baz","4.0"},{"foo","1.2"}]). +-define(CLASSES_CONFIG, [{"view_compact","2.4"},{"db_update","1.9"}]). +-define(SHARDS_CONFIG, [ + { + {<<"shards/00000000-1fffffff/foo/pizza_db">>, db_update}, + 5.0 + }, + { + {<<"shards/00000000-1fffffff/foo/pizza_db">>, view_update}, + 7.0 + } +]). + + +priorities_test_() -> + {ok, ShardP} = ioq_config:build_shard_priorities(?SHARDS_CONFIG), + {ok, UserP} = ioq_config:build_user_priorities(?USERS_CONFIG), + {ok, ClassP} = ioq_config:build_class_priorities(?CLASSES_CONFIG), + Tests = [ + %% {User, Shard, Class, UP * SP * CP} + {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, db_update, 1.2 * 5.0 * 1.9}, + {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, view_update, 1.2 * 7.0 * 1.0}, + {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, view_compact, 1.2 * 1.0 * 2.4}, + {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, db_compact, 1.2 * 1.0 * 0.0001}, + {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, internal_repl, 1.2 * 1.0 * 0.001}, + {<<"baz">>, undefined, internal_repl, 4 * 1.0 * 0.001} + ], + lists:map( + fun({User, Shard, Class, Priority}) -> + Req = #ioq_request{user=User, shard=Shard, class=Class}, + ?_assertEqual( + Priority, + ioq_config:prioritize(Req, ClassP, UserP, ShardP) + ) + end, + Tests + ). + + +parse_shard_string_test_() -> + Shard = "shards/00000000-1fffffff/foo/pizza_db", + Classes = ["db_update", "view_update", "view_compact", "db_compact"], + lists:map( + fun(Class) -> + ShardString = Shard ++ "||" ++ Class, + ?_assertEqual( + {list_to_binary(Shard), list_to_existing_atom(Class)}, + ioq_config:parse_shard_string(ShardString) + ) + end, + Classes + ). + + +parse_bad_string_test_() -> + Shard = "shards/00000000-1fffffff/foo/pizza_db$$$$$ASDF", + ?_assertEqual( + {error, Shard}, + ioq_config:parse_shard_string(Shard) + ). + + +to_float_test_() -> + Default = 123456789.0, + Tests = [ + {0.0, 0}, + {0.0, "0"}, + {1.0, "1"}, + {1.0, 1}, + {7.9, 7.9}, + {7.9, "7.9"}, + {79.0, "79"}, + {Default, "asdf"} + ], + [?_assertEqual(E, ioq_config:to_float(T, Default)) || {E, T} <- Tests]. + + +config_set_test_() -> + { + "Test ioq_config setters", + { + foreach, + fun() -> test_util:start_applications([config, couch_log]) end, + fun(_) -> test_util:stop_applications([config, couch_log]) end, + [ + fun check_simple_configs/1, + fun check_bypass_configs/1 + ] + } + }. + + +check_simple_configs(_) -> + Defaults = [ + {"concurrency", "1"}, + {"resize_limit", "1000"}, + {"dedupe", "true"}, + {"scale_factor", "2.0"}, + {"max_priority", "10000.0"}, + {"enabled", "false"}, + {"dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER} + ], + SetTests = [ + {set_concurrency, 9, "9"}, + {set_resize_limit, 8888, "8888"}, + {set_dedupe, false, "false"}, + {set_scale_factor, 3.14, "3.14"}, + {set_max_priority, 99999.99, "99999.99"}, + {set_enabled, true, "true"}, + {set_dispatch_strategy, ?DISPATCH_FD_HASH, ?DISPATCH_FD_HASH} + ], + + Reason = "ioq_config_tests", + %% Custom assert for handling floats as strings + Assert = fun(Expected0, Value0) -> + ?_assertEqual( + ioq_config:to_float(Expected0, Expected0), + ioq_config:to_float(Value0, Value0) + ) + end, + + Tests0 = lists:map(fun({Key, Default}) -> + Value = config:get("ioq2", Key, Default), + ?_assertEqual(Default, Value) + end, Defaults), + + lists:foldl(fun({Fun, Value, Result}, Acc) -> + ok = ioq_config:Fun(Value, Reason), + Key = lists:sublist(atom_to_list(Fun), 5, 9999), + Value1 = config:get("ioq2", Key, undefined), + [Assert(Result, Value1) | Acc] + end, Tests0, SetTests). + + +check_bypass_configs(_) -> + ok = ioq_config:set_bypass(interactive, true, "Bypassing interactive"), + Value = config:get_boolean("ioq2.bypass", "interactive", false), + ?_assertEqual(true, Value). diff --git a/test/ioq_kv_tests.erl b/test/ioq_kv_tests.erl new file mode 100644 index 0000000..3c34573 --- /dev/null +++ b/test/ioq_kv_tests.erl @@ -0,0 +1,149 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_kv_tests). +-behaviour(proper_statem). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + + +-export([ + initial_state/0, + command/1, + precondition/2, + postcondition/3, + next_state/3, + random_key/1 +]). + +-record(st, {kvs}). + +proper_test_() -> + PropErOpts = [ + {to_file, user}, + {max_size, 5}, + {numtests, 1000} + ], + {timeout, 3600, ?_assertEqual([], proper:module(?MODULE, PropErOpts))}. + + +prop_ioq_kvs_almost_any() -> + ?FORALL({K, V}, kvs(), begin + case (catch ioq_kv:put(K, V)) of + ok -> ioq_kv:get(K) == V; + {'EXIT', {invalid_term, _}} -> true; + _ -> false + end + end). + + +prop_ioq_kvs() -> + ?FORALL(Cmds, commands(?MODULE), + begin + cleanup(), + {H, S, R} = run_commands(?MODULE, Cmds), + ?WHENFAIL( + io:format("History: ~p\nState: ~p\nRes: ~p\n", [H,S,R]), + R =:= ok + ) + end + ). + +initial_state() -> + #st{kvs=dict:new()}. + + +command(S) -> + Key = {call, ioq_kv_tests, random_key, [S]}, + frequency([ + {1, {call, ioq_kv, init, []}}, + {9, {call, ioq_kv, get, [Key]}}, + {1, {call, ioq_kv, get, [key()]}}, + {9, {call, ioq_kv, put, [Key, val()]}}, + {1, {call, ioq_kv, put, [key(), val()]}}, + {2, {call, ioq_kv, delete, [Key]}}, + {1, {call, ioq_kv, delete, [key()]}} + ]). + + +precondition(_, _) -> + true. + + +postcondition(_S, {call, _, init, []}, ok) -> + true; +postcondition(S, {call, _, get, [Key]}, Val) -> + case dict:is_key(Key, S#st.kvs) of + true -> + case dict:find(Key, S#st.kvs) of + {ok, Val} -> true; + _ -> false + end; + false -> + case Val of + undefined -> true; + _ -> false + end + end; +postcondition(_S, {call, _, put, [_Key, _Val]}, ok) -> + true; +postcondition(_S, {call, _, delete, [_Key]}, ok) -> + true; +postcondition(_S, _, _) -> + false. + + +next_state(S, _V, {call, _, init, []}) -> + S; +next_state(S, _V, {call, _, get, [_Key]}) -> + S; +next_state(S, _V, {call, _, put, [Key, Val]}) -> + S#st{ + kvs={call, dict, store, [Key, Val, S#st.kvs]} + }; +next_state(S, _V, {call, _, delete, [Key]}) -> + S#st{ + kvs={call, dict, erase, [Key, S#st.kvs]} + }. + + +random_key(#st{kvs=KVs}) -> + Keys0 = dict:fetch_keys(KVs), + Keys = lists:append(Keys0, [foo]), + NumKeys = erlang:length(Keys), + KeyPos = random:uniform(NumKeys), + lists:nth(KeyPos, Keys). + +cleanup() -> + code:purge(ioq_kv_dyn), + code:delete(ioq_kv_dyn). + + +% Generators + +key() -> almost_any(). +val() -> almost_any(). +kvs() -> {any(), any()}. + +% ioq_kv can't handle storing bitstrings that don't have +% a length divisible by 8. Instead of being clever I +% just define an almost any. +almost_any() -> + oneof([ + integer(), + float(), + atom(), + binary(), + ?LAZY(loose_tuple(almost_any())), + ?LAZY(list(almost_any())) + ]). diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl new file mode 100644 index 0000000..b6b7bad --- /dev/null +++ b/test/ioq_tests.erl @@ -0,0 +1,68 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_tests). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all_test_() -> + {setup, fun setup/0, fun cleanup/1, fun instantiate/1}. + +setup() -> + Apps = test_util:start_applications([ + config, folsom, couch_log, couch_stats, ioq + ]), + FakeServer = fun(F) -> + receive {'$gen_call', {Pid, Ref}, Call} -> + Pid ! {Ref, {reply, Call}} + end, + F(F) + end, + {Apps, spawn(fun() -> FakeServer(FakeServer) end)}. + +cleanup({Apps, Server}) -> + test_util:stop_applications(Apps), + exit(Server, kill). + +instantiate({_, S}) -> + [{inparallel, lists:map(fun(IOClass) -> + lists:map(fun(Shard) -> + check_call(S, make_ref(), priority(IOClass, Shard)) + end, shards()) + end, io_classes())}, + ?_assertEqual(20, ioq:set_disk_concurrency(10)), + ?_assertError(badarg, ioq:set_disk_concurrency(0)), + ?_assertError(badarg, ioq:set_disk_concurrency(-1)), + ?_assertError(badarg, ioq:set_disk_concurrency(foo))]. + +check_call(Server, Call, Priority) -> + ?_assertEqual({reply, Call}, ioq:call(Server, Call, Priority)). + +io_classes() -> [interactive, view_update, db_compact, view_compact, + internal_repl, other]. + +shards() -> + [ + <<"shards/0-1/heroku/app928427/couchrest.1317609656.couch">>, + <<"shards/0-1/foo">>, + <<"shards/0-3/foo">>, + <<"shards/0-1/bar">>, + <<"shards/0-1/kocolosk/stats.1299297461.couch">>, + <<"shards/0-1/kocolosk/my/db.1299297457.couch">>, + other + ]. + +priority(view_update, Shard) -> + {view_update, Shard, <<"_design/foo">>}; +priority(Any, Shard) -> + {Any, Shard}.