Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

MapReduce utility functions #20

Open
wants to merge 1 commit into from

1 participant

@whitenode

I have created an Erlang module containing a set of mapreduce utility functions for Riak. Some of the examples provided uses multiple functions and the same example data set, and I therefore choose not to break the functions out into separate files.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
397 function-contrib-gollum/riak_mapreduce_utils.md
@@ -0,0 +1,397 @@
+Overview
+========
+
+The riak_mapreduce_utils module contains a collection of seven mapreduce utility functions implemented in Erlang.
+
+The first one is a map phase delete function called **map_delete**, which allows deletes to be performed locally where the data resides. This complements the [reduce phase delete function](http://contrib.basho.com/delete_keys.html) that is already available. It can be configured to only process records belonging to a specific Bucket.
+
+The next 2 functions, **map_indexlink** and **map_indexinclude** allow master-detail relationships to be defined and managed through secondary indexes (2i) instead of links. These functions rely on a reference to the master record being set as a secondary index on each of the detail records. Unlike when using links to manage master-detail relationships, this method allows detail records to be added or deleted without having to update the master record.
+
+**map_indexlink** allows retrieval of master record from the detail record based on the 2i in a manner similar to how links work.
+
+**map_indexinclude** allows retrieval of detail records linked to a master record through an index query.
+
+The fourth one is a map phase function called **map_metafilter**. This allows records to be filtered out from the result set based on Bucket and meta data (2i and user metadata).
+
+**map_id** returns readable bucket and key pairs, while the **map_key** function just returns readable keys.
+
+The last one, **map_datasize** returns the size of the stored object.
+
+Installation
+============
+
+Check out and compile the module by opening an erlang shell with the directory the module file resides in as part of the path as shown below.
+
+ $> erl -pa .
+ Erlang R15B (erts-5.9) [source] [smp:2:2] [async-threads:0] [hipe] [kernel-poll:false]
+
+ Eshell V5.9 (abort with ^G)
+ 1> c(riak_mapreduce_utils.erl).
+ {ok,riak_mapreduce_utils}
+ 2>
+
+The compiled file must then be deployed to all nodes in the cluster. This is done by placing the *riak_mapreduce_utils.beam* file in a directory indicated by the *add_paths* parameter in the *riak_kv* section of the *app.config* file. Please see the [Basho Wiki](http://wiki.basho.com/Configuration-Files.html) for further details.
+
+Function Reference
+==================
+
+This section contains more detailed information about the configuration, usage and behaviour of the functions provided in this module.
+
+The examples provided assume we have test data in the two buckets named **master** and **detail**. The records in the detail bucket have a reference to the appropriate master in a binary secondary index called **fk_master_bin**.
+
+The records in the **details** bucket also have a secondary integer index defined named **idx_int** as well as a metadata field called **type**. These have been added for use with the last of the four functions, **map_metafilter**.
+
+As these mapping functions do not operate on the record data in any way, this has been set to a generic string.
+
+The example records can be created through *curl* as follows:
+
+ $> curl -X POST \
+ -d 'm1_data' \
+ http://localhost:8098/buckets/master/keys/m1
+
+ $> curl -X POST \
+ -d 'm2_data' \
+ http://localhost:8098/buckets/master/keys/m2
+
+ $> curl -X POST \
+ -H 'x-riak-index-fk_master_bin: m1' \
+ -H 'x-riak-index-idx_int: 12' \
+ -H 'x-riak-meta-type: X' \
+ -d 'd1_data' \
+ http://localhost:8098/buckets/detail/keys/d1
+
+ $> curl -X POST \
+ -H 'x-riak-index-fk_master_bin: m1' \
+ -H 'x-riak-index-idx_int: 68' \
+ -H 'x-riak-meta-type: Y' \
+ -d 'd2_data' \
+ http://localhost:8098/buckets/detail/keys/d2
+
+ $> curl -X POST \
+ -H 'x-riak-index-fk_master_bin: m1' \
+ -H 'x-riak-index-idx_int: 125' \
+ -H 'x-riak-meta-type: X' \
+ -d 'd3_data' \
+ http://localhost:8098/buckets/detail/keys/d3
+
+ $> curl -X POST \
+ -H 'x-riak-index-fk_master_bin: m2' \
+ -H 'x-riak-index-idx_int: 9' \
+ -H 'x-riak-meta-type: Y' \
+ -d 'd4_data' \
+ http://localhost:8098/buckets/detail/keys/d4
+
+map_id()
+----------------
+
+The **map_id** function takes one optional argument, the name of the bucket to print IDs from. If specified, records belonging to a bucket different from the one specified will be filtered out. If the parameter is not specified, the IDs of all records that are passed to the function will be output.
+
+###Example
+
+This example lists all IDs currently present in the *detail* bucket.
+
+ $> curl -XPOST http://localhost:8098/mapred -H 'Content-Type: application/json' -d '{
+ "inputs":"detail",
+ "query":[{"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_id"}}]}'
+
+ [["detail","d2"],["detail","d3"],["detail","d4"],["detail","d1"]]
+ $>
+
+map_key()
+----------------
+
+The **map_key** function takes one optional argument, the name of the bucket to print keys from. If specified, records belonging to a bucket different from the one specified will be filtered out. If the parameter is not specified, the keys of all records that are passed to the function will be output.
+
+###Example
+
+This example specifies that only keys belonging to the *detail* bucket are to be output, and the record belonging to the *master* bucket is therefore suppressed.
+
+ $> curl -XPOST http://localhost:8098/mapred -H 'Content-Type: application/json' -d '{
+ "inputs":[["detail","d3"],["master","m1"],["detail", "d1"]],
+ "query":[{"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_key","arg":"detail"}}]}'
+
+ ["d3","d1"]
+ $>
+
+map_delete()
+------------
+
+The **map_delete** function takes one optional argument, the name of the bucket to delete records from. If this parameter (a string) is defined, it will only delete records that belong to this bucket. If no argument is specified, it will delete any record passed into it.
+
+The function returns the number of records deleted and can be used together with **riak_kv_mapreduce:reduce_sum** to get the total number of records deleted through the job.
+
+###Examples
+
+**Delete all records in the detail bucket**
+
+This example deletes all records belonging to the *detail* bucket and returns a count of the number of records deleted.
+
+ $> curl -XPOST http://localhost:8098/mapred -H 'Content-Type: application/json' -d '{
+ "inputs":"detail",
+ "query":[{"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_delete","keep":false}},
+ {"reduce":{"language":"erlang","module":"riak_kv_mapreduce",
+ "function":"reduce_sum"}}]}'
+ [4]
+ $>
+
+**Delete only record passed in that belong to the *detail* bucket**
+
+The following example contains a filter for only deleting records belonging to the detail bucket. Of the 2 records passed in, only the one from the detail bucket is deleted.
+
+ $> curl -XPOST http://localhost:8098/mapred -H 'Content-Type: application/json' -d '{
+ "inputs":[["master", "m1"],["detail", "d4"]],
+ "query":[{"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_delete","keep":false,"arg":"detail"}},
+ {"reduce":{"language":"erlang","module":"riak_kv_mapreduce",
+ "function":"reduce_sum"}}]}'
+ [1]
+ $>
+
+map_indexlink()
+---------------
+
+The **map_indexlink** function allows retrieval of related records through secondary index links in a way similar to how link walking works. It does assume that the link is specified through a secondary index, which contains a reference (key) to another record. This allows the retrieval of e.g. a master record based on a detail record.
+
+The function takes a JSON formatted argument that specifies how the linking is done and what index that is to be used.
+
+The function can be configured to return either both the source and target records or just the resulting target record.
+
+###Configuration
+
+The configuration string must be a correctly formatted JSON document and may contain the following 4 fields:
+
+**source** - Name of the bucket containing records to be processed. If a record is encountered that does not belong to the specified bucket, it will be passed straight through based on the **keep** parameter described below.
+
+This is an optional field, and if no **source** is specified, all records passed in will be processed.
+
+**target** - This field is mandatory and must contain the name of the bucket the operation should link to. The value of this parameter will be used together with the index value to create the target object ID.
+
+**indexname** - This field is mandatory and must contain the name of the index to be used for the linking.
+
+**keep** - This parameter is optional and defaults to *true*. It specifies whether the input should be kept as part of the result set or not. If set to false, only records retrieved through the link are returned.
+
+Below is an example of a valid configuration based on the testdata specified above.
+
+ "{
+ "source":"detail",
+ "target":"master",
+ "indexname":"fk_master_bin",
+ "keep":"false"
+ }"
+
+If a single detail record is processed by a function with this configuration, it will return just the master record as it is configured to not keep the input record in the result set.
+
+Note that the link is evaluated once for every record passed in, so if multiple detail records belonging to the same master record are passed on, there may be multiple occurances of the master record as a result.
+
+###Examples
+
+**Retrieving master record related to single detail record**
+
+The following example takes a single detail record as input and returns this record (*detail:d2*) together with the master record (*master:m1*) it is related to as no value was specified for the **keep** parameter passed to the function.
+
+ $> curl -XPOST http://localhost:8098/mapred -H 'Content-Type: application/json' -d '{
+ "inputs":[["detail","d2"]],
+ "query":[{"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_indexlink","keep":false,
+ "arg":"{\"source\":\"detail\",\"target\":\"master\",
+ \"indexname\":\"fk_master_bin\"}"}},
+ {"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_id"}}]}'
+ [["detail","d2"],["master","m1"]]
+ $>
+
+**Retrieving master record related to single detail record while dropping the input**
+
+This example returns the master record only (*master:m1*), as the input to the map phase function (*detail:d2*) is configured to be dropped.
+
+ $> curl -XPOST http://localhost:8098/mapred -H 'Content-Type: application/json' -d '{
+ "inputs":[["detail","d2"]],
+ "query":[{"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_indexlink","keep":false,
+ "arg":"{\"source\":\"detail\",\"target\":\"master\",
+ \"indexname\":\"fk_master_bin\",\"keep\":\"false\"}"}},
+ {"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_id"}}]}'
+ [["master","m1"]]
+ $>
+
+###Limitations
+
+As the function relies on secondary indexes, it requires a storage backend that supports secondary indexes to be used, e.g. eleveldb.
+
+map_indexinclude()
+------------------
+
+In the example data provided, The link between detail records and their respective master is created through a secondary index on the detail records (*fk_master_bin*). **map_indexinclude** makes it possible to retrieve such detail records based on the master record.
+
+This function complements **map_indexlink** in that it allows processing of the relationship the other way.
+
+The function takes a JSON formatted argument that specifies how the linking is done and what index that is to be used.
+
+###Configuration
+
+The configuration string must be a correctly formatted JSON document and may contain the following 4 fields:
+
+**source** - Name of the bucket containing records to be processed. This is a mandatory field and only records belonging to this bucket will be processed. If a record is encountered that does not belong to the specified bucket, it will be passed straight through based on the **keep** parameter described below
+
+**target** - This field is mandatory and must contain the name of the bucket the operation should link to.
+
+**indexname** - Thi field is mandatory and must contain the name of the index on the target bucket that is to be used.
+
+**keep** - This parameter is optional and detaults to *true*. It specifies whether the input should be kept as part of the result set or not. If set to false, only records retrieved through the link are returned.
+
+###Example
+
+**Retrieve all detail records related to a specific master record**
+
+This example takes a single master record and retrieves all related detail records. AS **keep** is set to *false*, the original master record that the fetch was based on is excluded from the result set, and only the appropriate detail records are returned.
+
+ $> curl -XPOST http://localhost:8098/mapred -H 'Content-Type: application/json' -d '{
+ "inputs":[["master","m1"]],
+ "query":[{"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_indexinclude","keep":false,
+ "arg":"{\"source\":\"master\",\"target\":\"detail\",
+ \"indexname\":\"fk_master_bin\",\"keep\":\"false\"}"}},
+ {"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_id"}}]}'
+ [["detail","d3"],["detail","d2"],["detail","d1"]]
+ $>
+
+###Limitations
+
+As the function relies on secondary indexes, it requires a storage backend that supports secondary indexes to be used, e.g. eleveldb.
+
+map_metafilter()
+----------------
+
+**map_metafilter** allow records to be filtered out from the result set through configurable criteria based on content agnostic meta data, i.e. secondary indexes, user meta data and bucket name.
+
+If a record matches all the criteria configured for the filter, it will be dropped/excluded while other records will be passed through to the next step.
+
+###Configuration
+
+The configuration string must be a correctly formatted and escaped JSON document and may contain the following 2 fields, out of which at least one needs to be present:
+
+**source** - Name of the bucket containing records to be filtered. If a record is encountered that does not belong to the specified bucket, it will be passed straight through and not processed with respect to criteria by the function.
+
+This is an optional field, and if no **source** is specified, all records passed in will be processed.
+
+**criteria** - This field contains a list of criteria. These will be evaluated against the record, and if ALL of them evaluate to true, the record will be filtered out.
+
+A criteria consists of an *operation*, a *field* to operate on and a *parameter*.
+
+Fields can be either a secondary index (specified as *"index:\<index name\>"*) or user metadata field (specified as *"meta:\<meta data name\>"*).
+
+The following operations are allowed:
+
+- **"eq"** takes a single parameter and will evaluate to *true* if the field value is equal to the parameter. Example: "["eq","index:idx_int","12"]"
+
+- **"neq"** takes a single parameter and will evaluate to *true* if the field value is NOT equal to the parameter. Example: "["neq","index:idx_int","12"]"
+
+- **"greater_than"** takes a single parameter and will evaluate to *true* if the field value is strictly greater than the parameter value. If the field value is an integer, an attempt will be made to convert the parameter to integer and perform the evaluation based on these. If this fails, the field value will be converted to binary and compared to the parameter. Example: "["greater_than","index:idx_int","12"]"
+
+- **"greater_than_eq"** takes a single parameter and will evaluate to *true* if the field value is greater than or equal to the parameter value. If the field value is an integer, an attempt will be made to convert the parameter to integer and perform the evaluation based on these. If this fails, the field value will be converted to binary and compared to the parameter. Example: "["greater_than_eq","meta:X-Riak-Meta-Type","Y"]"
+
+- **"less_than"** takes a single parameter and will evaluate to *true* if the field value is strictly less than the parameter value. If the field value is an integer, an attempt will be made to convert the parameter to integer and perform the evaluation based on these. If this fails, the field value will be converted to binary and compared to the parameter. Example: "["less_than","index:idx_int","56"]"
+
+- **"less_than_eq"** takes a single parameter and will evaluate to *true* if the field value is less than or equal to the parameter value. If the field value is an integer, an attempt will be made to convert the parameter to integer and perform the evaluation based on these. If this fails, the field value will be converted to binary and compared to the parameter. Example: "["less_than_eq","index:idx_int","12"]"
+
+Below is an example of a valid configuration based on the testdata specified above.
+
+ "{
+ "source":"detail",
+ "criteria":[["eq","meta:X-Riak-Meta-Type","X"],["less_than","index:idx_int","15"]]
+ }"
+
+###Examples
+
+**Filtering all records from a specific bucket**
+
+This example filters out all records belonging to the *master* bucket and keeps everything else.
+
+ $> curl -XPOST http://localhost:8098/mapred -H 'Content-Type: application/json' -d '{
+ "inputs":[["master","m1"],["detail","d2"],["detail","d3"]],
+ "query":[{"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_metafilter","keep":false,
+ "arg":"{\"source\":\"master\"}"}},
+ {"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_id"}}]}'
+ [["detail","d2"],["detail","d3"]]
+ $>
+
+**Filtering based on meta field**
+
+This example filters based on the *X-Riak-Meta-Type* field and drops all records that have this field set to *X*.
+
+ $> curl -XPOST http://localhost:8098/mapred -H 'Content-Type: application/json' -d '{
+ "inputs":"detail",
+ "query":[{"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_metafilter","keep":false,
+ "arg":"{\"criteria\":[[\"eq\",\"meta:X-Riak-Meta-Type\",\"X\"]]}"}},
+ {"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_id"}}]}'
+ [["detail","d4"],["detail","d2"]]
+ $>
+
+**Filtering based on index field**
+
+This example shows how to filter based on a secondary integer index. It filters out all *detail* records that have an index value less than or equal to 68.
+
+ $> curl -XPOST http://localhost:8098/mapred -H 'Content-Type: application/json' -d '{
+ "inputs":"detail",
+ "query":[{"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_metafilter","keep":false,
+ "arg":"{\"source\":\"detail\",
+ \"criteria\":[[\"less_than_eq\",\"index:idx_int\",\"68\"]]}"}},
+ {"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_id"}}]}'
+ [["detail","d3"]]
+ $>
+
+**Composite filtering of specific bucket**
+
+The example below shows how to filter out all the records that have an integer index value between 10 and 100, including the edge values.
+
+ $> curl -XPOST http://localhost:8098/mapred -H 'Content-Type: application/json' -d '{
+ "inputs":"detail",
+ "query":[{"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_metafilter","keep":false,
+ "arg":"{\"source\":\"detail\",
+ \"criteria\":[[\"greater_than_eq\",\"index:idx_int\",\"10\"],
+ [\"less_than_eq\",\"index:idx_int\",\"100\"]]}"}},
+ {"map":{"language":"erlang","module":"riak_mapreduce_utils",
+ "function":"map_id"}}]}'
+ [["detail","d4"],["detail","d3"]]
+ $>
+
+map_datasize()
+----------------
+
+The **map_datasize** function returns the size of the value stored in bytes. If there are siblings present, it returns the combined size of these values.
+
+###Example
+
+This example sums up the size of data for all records in the *master* bucket.
+
+ $> curl -XPOST http://localhost:8098/mapred -H 'Content-Type: application/json' -d '{
+ "inputs":"master",
+ "query":[{"map":{"language":"erlang","module":"riak_mapreduce_utils","function":"map_datasize"}},
+ {"reduce":{"language":"erlang","module":"riak_kv_mapreduce","function":"reduce_sum"}}]}'
+ [14]
+ $>
+
+
+License & Copyright
+-------------------
+
+Copyright ©2012 Christian Dahlqvist, WhiteNode Software Ltd
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
+
View
377 mapreduce/erlang/riak_mapreduce_utils.erl
@@ -0,0 +1,377 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_mapreduce_utils: Utility functions for defining map/reduce processing.
+%%
+%% Copyright (c)2012, Christian Dahlqvist, WhiteNode Software Ltd. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_mapreduce_utils).
+
+-export([map_delete/3,
+ map_indexinclude/3,
+ map_indexlink/3,
+ map_metafilter/3,
+ map_id/3,
+ map_key/3,
+ map_datasize/3
+ ]).
+
+%% From riak_pb_kv_codec.hrl
+-define(MD_USERMETA, <<"X-Riak-Meta">>).
+-define(MD_INDEX, <<"index">>).
+
+%%
+%% Map Phases
+%%
+
+%% @spec map_delete(riak_object:riak_object(), term(), term()) ->
+%% [integer()]
+%% @doc map phase function for deleting records
+map_delete({error, notfound}, _, _) ->
+ [];
+map_delete(RiakObject, Props, Arg) when is_list(Arg) ->
+ map_delete(RiakObject, Props, list_to_binary(Arg));
+map_delete(RiakObject, Props, Arg) when is_atom(Arg) ->
+ map_delete(RiakObject, Props, <<"">>);
+map_delete(RiakObject, _, Arg) when is_binary(Arg) ->
+ {ok, C} = riak:local_client(),
+ Bucket = riak_object:bucket(RiakObject),
+ Key = riak_object:key(RiakObject),
+ case Arg of
+ Bucket ->
+ C:delete(Bucket, Key),
+ [1];
+ <<"">> ->
+ C:delete(Bucket, Key),
+ [1];
+ _ ->
+ []
+ end;
+map_delete(_, _, _) ->
+ [].
+
+%% @spec map_indexinclude(riak_object:riak_object(), term(), term()) ->
+%% [{{Bucket :: binary(), Key :: binary()}, Props :: term()}]
+%% @doc map phase function for including based on secondary index query in a
+%% manner similar to links.
+map_indexinclude({error, notfound}, _, _) ->
+ [];
+map_indexinclude(RiakObject, Props, JsonArg) ->
+ Bucket = riak_object:bucket(RiakObject),
+ Key = riak_object:key(RiakObject),
+ InitialList = [{{Bucket, Key}, Props}],
+ % Parse config arg
+ %%Args = decode_arguments(JsonArg),
+ {struct, Args} = mochijson2:decode(JsonArg),
+ case proplists:get_value(<<"keep">>, Args) of
+ <<"false">> ->
+ Keep = false;
+ _ ->
+ Keep = true
+ end,
+ case {proplists:get_value(<<"source">>, Args),
+ proplists:get_value(<<"target">>, Args),
+ proplists:get_value(<<"indexname">>, Args)} of
+ {undefined, _, _} ->
+ return_list(InitialList, [], Keep);
+ {_, undefined, _} ->
+ return_list(InitialList, [], Keep);
+ {_, _, undefined} ->
+ return_list(InitialList, [], Keep);
+ {Bucket, Target, IndexName} ->
+ Result = get_index_items(Target, Props, IndexName, Key),
+ return_list(InitialList, Result, Keep);
+ _ ->
+ return_list(InitialList, [], Keep)
+ end.
+
+%% @spec map_indexlink(riak_object:riak_object(), term(), term()) ->
+%% [{{Bucket :: binary(), Key :: binary()}, Props :: term()}]
+%% @doc map phase function for inclusion based on local secondary index value
+map_indexlink({error, notfound}, _, _) ->
+ [];
+map_indexlink(RiakObject, Props, JsonArg) ->
+ Bucket = riak_object:bucket(RiakObject),
+ Key = riak_object:key(RiakObject),
+ InitialList = [{{Bucket, Key}, Props}],
+ {struct, Args} = mochijson2:decode(JsonArg),
+ case proplists:get_value(<<"keep">>, Args) of
+ <<"false">> ->
+ Keep = false;
+ _ ->
+ Keep = true
+ end,
+ case {proplists:get_value(<<"source">>, Args),
+ proplists:get_value(<<"target">>, Args),
+ proplists:get_value(<<"indexname">>, Args)} of
+ {undefined, _, _} ->
+ return_list(InitialList, [], Keep);
+ {_, undefined, _} ->
+ return_list(InitialList, [], Keep);
+ {_, _, undefined} ->
+ return_list(InitialList, [], Keep);
+ {Bucket, Target, IndexName} ->
+ Result = create_indexlink_list(RiakObject, Props, IndexName, Target),
+ return_list(InitialList, Result, Keep);
+ _ ->
+ return_list(InitialList, [], Keep)
+ end.
+
+%% @spec map_metafilter(riak_object:riak_object(), term(), term()) ->
+%% [{{Bucket :: binary(), Key :: binary()}, Props :: term()}]
+%% @doc map phase function for selectively discarding records from the current set
+map_metafilter({error, notfound}, _, _) ->
+ [];
+map_metafilter(RiakObject, Props, JsonArg) ->
+ Bucket = riak_object:bucket(RiakObject),
+ Key = riak_object:key(RiakObject),
+ MetaDataList = riak_object:get_metadatas(RiakObject),
+ InitialList = [{{Bucket, Key}, Props}],
+ {struct, Args} = mochijson2:decode(JsonArg),
+ case {proplists:get_value(<<"source">>, Args),
+ proplists:get_value(<<"criteria">>, Args)} of
+ {Bucket, undefined} ->
+ Result = true;
+ {Bucket, []} ->
+ Result = true;
+ {Bucket, Criteria} when is_list(Criteria)->
+ Result = check_criteria(MetaDataList, Criteria);
+ {undefined, Criteria} when is_list(Criteria) ->
+ Result = check_criteria(MetaDataList, Criteria);
+ _ ->
+ Result = false
+ end,
+ case Result of
+ true ->
+ [];
+ _ ->
+ InitialList
+ end.
+
+%% @spec map_id(riak_object:riak_object(), term(), term()) ->
+%% [[Bucket :: binary(), Key :: binary()]]
+%% @doc map phase function returning bucket name and key in a readable format
+map_id({error, notfound}, _, _) ->
+ [];
+map_id(RiakObject, Props, Arg) when is_list(Arg) ->
+ map_id(RiakObject, Props, list_to_binary(Arg));
+map_id(RiakObject, Props, Arg) when is_atom(Arg) ->
+ map_id(RiakObject, Props, <<"">>);
+map_id(RiakObject, _, Arg) when is_binary(Arg) ->
+ Bucket = riak_object:bucket(RiakObject),
+ Key = riak_object:key(RiakObject),
+ case Arg of
+ Bucket ->
+ [[Bucket, Key]];
+ <<"">> ->
+ [[Bucket, Key]];
+ _ ->
+ []
+ end;
+map_id(_, _, _) ->
+ [].
+
+%% @spec map_key(riak_object:riak_object(), term(), term()) ->
+%% [Key :: binary()]
+%% @doc map phase function returning object key in a readable format
+map_key({error, notfound}, _, _) ->
+ [];
+map_key(RiakObject, Props, Arg) when is_list(Arg) ->
+ map_key(RiakObject, Props, list_to_binary(Arg));
+map_key(RiakObject, Props, Arg) when is_atom(Arg) ->
+ map_key(RiakObject, Props, <<"">>);
+map_key(RiakObject, _, Arg) when is_binary(Arg) ->
+ Bucket = riak_object:bucket(RiakObject),
+ Key = riak_object:key(RiakObject),
+ case Arg of
+ Bucket ->
+ [Key];
+ <<"">> ->
+ [Key];
+ _ ->
+ []
+ end;
+map_key(_, _, _) ->
+ [].
+
+%% @spec map_datasize(riak_object:riak_object(), term(), term()) ->
+%% [integer()]
+%% @doc map phase function returning size of the data stored in bytes.
+%% It does return total size if siblings are found.
+map_datasize({error, notfound}, _, _) ->
+ [];
+map_datasize(RiakObject, _, _) ->
+ DataSize = lists:foldl(fun(V, A) ->
+ (byte_size(V) + A)
+ end, 0, riak_object:get_values(RiakObject)),
+ [DataSize].
+
+%% hidden
+get_index_items(Bucket, Props, IndexName, Value) ->
+ {ok, C} = riak:local_client(),
+ case C:get_index(Bucket, {eq, IndexName, Value}) of
+ {ok, KeyList} ->
+ [{{Bucket, K}, Props} || K <- KeyList];
+ {error, _} ->
+ []
+ end.
+
+%% hidden
+create_indexlink_list(RiakObject, Props, IndexName, Target) ->
+ DictList = riak_object:get_metadatas(RiakObject),
+ Result = create_indexlink_list(DictList, Props, IndexName, Target, []),
+ sets:to_list(sets:from_list(Result)).
+
+%% hidden
+create_indexlink_list([], _Props, _IndexName, _Target, List) ->
+ sets:to_list(sets:from_list(List));
+create_indexlink_list([Dict | DictList], Props, IndexName, Target, List) ->
+ case dict:find(?MD_INDEX, Dict) of
+ error ->
+ create_indexlink_list(DictList, Props, IndexName, Target, List);
+ {ok, IndexList} ->
+ case [I || {K, I} <- IndexList, K == IndexName] of
+ [] ->
+ create_indexlink_list(DictList, Props, IndexName, Target, List);
+ [Indexes] when is_list(Indexes) ->
+ Result = [{{Target, V}, Props} || V <- Indexes],
+ ResList = lists:append(Result, List),
+ create_indexlink_list(DictList, Props, IndexName, Target, ResList);
+ [Index] ->
+ ResList = lists:append([{{Target, Index}, Props}], List),
+ create_indexlink_list(DictList, Props, IndexName, Target, ResList)
+ end
+ end.
+
+%% hidden
+return_list(Original, Result, true) ->
+ lists:append([Original, Result]);
+return_list(_, Result, _) ->
+ Result.
+
+%% hidden
+check_criteria(MetaDataList, Criteria) when is_list(Criteria) ->
+ case parse_criteria(Criteria, []) of
+ error ->
+ false;
+ CList ->
+ check_parsed_criteria(MetaDataList, CList)
+ end;
+check_criteria(_, _) ->
+ error.
+
+%% hidden
+parse_criteria([], CList) ->
+ CList;
+parse_criteria([C | R], CList) ->
+ case C of
+ [Op, Field, Val] ->
+ case {Op, parse_field(Field)} of
+ {_, error} -> error;
+ {<<"eq">>, F} -> parse_criteria(R, lists:append([{eq, F, Val}], CList));
+ {<<"neq">>, F} -> parse_criteria(R, lists:append([{neq, F, Val}], CList));
+ {<<"greater_than">>, F} -> parse_criteria(R, lists:append([{greater_than, F, Val}], CList));
+ {<<"greater_than_eq">>, F} -> parse_criteria(R, lists:append([{greater_than_eq, F, Val}], CList));
+ {<<"less_than">>, F} -> parse_criteria(R, lists:append([{less_than, F, Val}], CList));
+ {<<"less_than_eq">>, F} -> parse_criteria(R, lists:append([{less_than_eq, F, Val}], CList));
+ _ -> error
+ end;
+ _ -> error
+ end.
+
+%% hidden
+parse_field(Field) when is_list(Field) ->
+ parse_field(list_to_binary(Field));
+parse_field(Field) when is_binary(Field) ->
+ case Field of
+ <<"meta:", Val/binary>> -> {meta, Val};
+ <<"index:", Val/binary>> -> {index, Val};
+ _ -> error
+ end;
+parse_field(_) ->
+ error.
+
+%% hidden
+check_parsed_criteria([], _CList) ->
+ false;
+check_parsed_criteria([MetaData | Rest], CList) ->
+ case evaluate_criteria(MetaData, CList) of
+ true -> true;
+ _ -> check_parsed_criteria(Rest, CList)
+ end.
+
+%% hidden
+evaluate_criteria(_MetaData, []) ->
+ true;
+evaluate_criteria(MetaData, [{Op, {Type, F}, V} | List]) ->
+ case get_metadata_value(MetaData, Type, F) of
+ undefined ->
+ false;
+ Value ->
+ case check_value(Op, Value, V) of
+ true ->
+ evaluate_criteria(MetaData, List);
+ _ ->
+ false
+ end
+ end.
+
+get_metadata_value(MetaData, Type, MetaName) ->
+ case Type of
+ meta ->
+ MetaKey = ?MD_USERMETA,
+ MN = binary_to_list(MetaName);
+ index ->
+ MetaKey = ?MD_INDEX,
+ MN = MetaName
+ end,
+ case dict:find(MetaKey, MetaData) of
+ {ok, Value} ->
+ case [V || {K, V} <- Value, K == MN] of
+ [] -> undefined;
+ [V] when is_list(V) -> list_to_binary(V);
+ [V] -> V
+ end;
+ error -> undefined
+ end.
+
+%% hidden
+check_value(Op, Value, Param) when is_integer(Value) andalso is_binary(Param) ->
+ try list_to_integer(binary_to_list(Param)) of
+ Integer -> check_value(Op, Value, Integer)
+ catch
+ _:_ ->
+ BValue = list_to_binary(integer_to_list(Value)),
+ BParam = list_to_binary(Param),
+ check_value(Op, BValue, BParam)
+ end;
+check_value(Op, Value, Param) when is_binary(Value) andalso is_integer(Param) ->
+ Pbin = list_to_binary(integer_to_list(Param)),
+ check_value(Op, Value, Pbin);
+check_value(eq, Value, Param)->
+ Value == Param;
+check_value(neq, Value, Param) ->
+ Value =/= Param;
+check_value(greater_than, Value, Param) ->
+ Value > Param;
+check_value(greater_than_eq, Value, Param) ->
+ Value >= Param;
+check_value(less_than, Value, Param) ->
+ Value < Param;
+check_value(less_than_eq, Value, Param) ->
+ Value =< Param.
Something went wrong with that request. Please try again.