Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

444 lines (324 sloc) 36.395 kB
<!DOCTYPE html>
<!-- saved from url=(0014)about:internet -->
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<title>Mapreduce in R</title>
<base target="_blank"/>
<style type="text/css">
body, td {
font-family: sans-serif;
background-color: white;
font-size: 12px;
margin: 8px;
}
tt, code, pre {
font-family: 'DejaVu Sans Mono', 'Droid Sans Mono', 'Lucida Console', Consolas, Monaco, monospace;
}
h1 {
font-size:2.2em;
}
h2 {
font-size:1.8em;
}
h3 {
font-size:1.4em;
}
h4 {
font-size:1.0em;
}
h5 {
font-size:0.9em;
}
h6 {
font-size:0.8em;
}
a:visited {
color: rgb(50%, 0%, 50%);
}
pre {
margin-top: 0;
max-width: 95%;
border: 1px solid #ccc;
}
pre code {
display: block; padding: 0.5em;
}
code.r {
background-color: #F8F8F8;
}
table, td, th {
border: none;
}
blockquote {
color:#666666;
margin:0;
padding-left: 1em;
border-left: 0.5em #EEE solid;
}
hr {
height: 0px;
border-bottom: none;
border-top-width: thin;
border-top-style: dotted;
border-top-color: #999999;
}
@media print {
* {
background: transparent !important;
color: black !important;
filter:none !important;
-ms-filter: none !important;
}
body {
font-size:12pt;
max-width:100%;
}
a, a:visited {
text-decoration: underline;
}
hr {
visibility: hidden;
page-break-before: always;
}
pre, blockquote {
padding-right: 1em;
page-break-inside: avoid;
}
tr, img {
page-break-inside: avoid;
}
img {
max-width: 100% !important;
}
@page :left {
margin: 15mm 20mm 15mm 10mm;
}
@page :right {
margin: 15mm 10mm 15mm 20mm;
}
p, h2, h3 {
orphans: 3; widows: 3;
}
h2, h3 {
page-break-after: avoid;
}
}
</style>
<!-- Styles for R syntax highlighter -->
<style type="text/css">
pre .operator,
pre .paren {
color: rgb(104, 118, 135)
}
pre .literal {
color: rgb(88, 72, 246)
}
pre .number {
color: rgb(0, 0, 205);
}
pre .comment {
color: rgb(76, 136, 107);
}
pre .keyword {
color: rgb(0, 0, 255);
}
pre .identifier {
color: rgb(0, 0, 0);
}
pre .string {
color: rgb(3, 106, 7);
}
</style>
<!-- R syntax highlighter -->
<script type="text/javascript">
var hljs=new function(){function m(p){return p.replace(/&/gm,"&amp;").replace(/</gm,"&lt;")}function f(r,q,p){return RegExp(q,"m"+(r.cI?"i":"")+(p?"g":""))}function b(r){for(var p=0;p<r.childNodes.length;p++){var q=r.childNodes[p];if(q.nodeName=="CODE"){return q}if(!(q.nodeType==3&&q.nodeValue.match(/\s+/))){break}}}function h(t,s){var p="";for(var r=0;r<t.childNodes.length;r++){if(t.childNodes[r].nodeType==3){var q=t.childNodes[r].nodeValue;if(s){q=q.replace(/\n/g,"")}p+=q}else{if(t.childNodes[r].nodeName=="BR"){p+="\n"}else{p+=h(t.childNodes[r])}}}if(/MSIE [678]/.test(navigator.userAgent)){p=p.replace(/\r/g,"\n")}return p}function a(s){var r=s.className.split(/\s+/);r=r.concat(s.parentNode.className.split(/\s+/));for(var q=0;q<r.length;q++){var p=r[q].replace(/^language-/,"");if(e[p]){return p}}}function c(q){var p=[];(function(s,t){for(var r=0;r<s.childNodes.length;r++){if(s.childNodes[r].nodeType==3){t+=s.childNodes[r].nodeValue.length}else{if(s.childNodes[r].nodeName=="BR"){t+=1}else{if(s.childNodes[r].nodeType==1){p.push({event:"start",offset:t,node:s.childNodes[r]});t=arguments.callee(s.childNodes[r],t);p.push({event:"stop",offset:t,node:s.childNodes[r]})}}}}return t})(q,0);return p}function k(y,w,x){var q=0;var z="";var s=[];function u(){if(y.length&&w.length){if(y[0].offset!=w[0].offset){return(y[0].offset<w[0].offset)?y:w}else{return w[0].event=="start"?y:w}}else{return y.length?y:w}}function t(D){var A="<"+D.nodeName.toLowerCase();for(var B=0;B<D.attributes.length;B++){var C=D.attributes[B];A+=" "+C.nodeName.toLowerCase();if(C.value!==undefined&&C.value!==false&&C.value!==null){A+='="'+m(C.value)+'"'}}return A+">"}while(y.length||w.length){var v=u().splice(0,1)[0];z+=m(x.substr(q,v.offset-q));q=v.offset;if(v.event=="start"){z+=t(v.node);s.push(v.node)}else{if(v.event=="stop"){var p,r=s.length;do{r--;p=s[r];z+=("</"+p.nodeName.toLowerCase()+">")}while(p!=v.node);s.splice(r,1);while(r<s.length){z+=t(s[r]);r++}}}}return z+m(x.substr(q))}function j(){function q(x,y,v){if(x.compiled){return}var u;var s=[];if(x.k){x.lR=f(y,x.l||hljs.IR,true);for(var w in x.k){if(!x.k.hasOwnProperty(w)){continue}if(x.k[w] instanceof Object){u=x.k[w]}else{u=x.k;w="keyword"}for(var r in u){if(!u.hasOwnProperty(r)){continue}x.k[r]=[w,u[r]];s.push(r)}}}if(!v){if(x.bWK){x.b="\\b("+s.join("|")+")\\s"}x.bR=f(y,x.b?x.b:"\\B|\\b");if(!x.e&&!x.eW){x.e="\\B|\\b"}if(x.e){x.eR=f(y,x.e)}}if(x.i){x.iR=f(y,x.i)}if(x.r===undefined){x.r=1}if(!x.c){x.c=[]}x.compiled=true;for(var t=0;t<x.c.length;t++){if(x.c[t]=="self"){x.c[t]=x}q(x.c[t],y,false)}if(x.starts){q(x.starts,y,false)}}for(var p in e){if(!e.hasOwnProperty(p)){continue}q(e[p].dM,e[p],true)}}function d(B,C){if(!j.called){j();j.called=true}function q(r,M){for(var L=0;L<M.c.length;L++){if((M.c[L].bR.exec(r)||[null])[0]==r){return M.c[L]}}}function v(L,r){if(D[L].e&&D[L].eR.test(r)){return 1}if(D[L].eW){var M=v(L-1,r);return M?M+1:0}return 0}function w(r,L){return L.i&&L.iR.test(r)}function K(N,O){var M=[];for(var L=0;L<N.c.length;L++){M.push(N.c[L].b)}var r=D.length-1;do{if(D[r].e){M.push(D[r].e)}r--}while(D[r+1].eW);if(N.i){M.push(N.i)}return f(O,M.join("|"),true)}function p(M,L){var N=D[D.length-1];if(!N.t){N.t=K(N,E)}N.t.lastIndex=L;var r=N.t.exec(M);return r?[M.substr(L,r.index-L),r[0],false]:[M.substr(L),"",true]}function z(N,r){var L=E.cI?r[0].toLowerCase():r[0];var M=N.k[L];if(M&&M instanceof Array){return M}return false}function F(L,P){L=m(L);if(!P.k){return L}var r="";var O=0;P.lR.lastIndex=0;var M=P.lR.exec(L);while(M){r+=L.substr(O,M.index-O);var N=z(P,M);if(N){x+=N[1];r+='<span class="'+N[0]+'">'+M[0]+"</span>"}else{r+=M[0]}O=P.lR.lastIndex;M=P.lR.exec(L)}return r+L.substr(O,L.length-O)}function J(L,M){if(M.sL&&e[M.sL]){var r=d(M.sL,L);x+=r.keyword_count;return r.value}else{return F(L,M)}}function I(M,r){var L=M.cN?'<span class="'+M.cN+'">':"";if(M.rB){y+=L;M.buffer=""}else{if(M.eB){y+=m(r)+L;M.buffer=""}else{y+=L;M.buffer=r}}D.push(M);A+=M.r}function G(N,M,Q){var R=D[D.length-1];if(Q){y+=J(R.buffer+N,R);return false}var P=q(M,R);if(P){y+=J(R.buffer+N,R);I(P,M);return P.rB}var L=v(D.length-1,M);if(L){var O=R.cN?"</span>":"";if(R.rE){y+=J(R.buffer+N,R)+O}else{if(R.eE){y+=J(R.buffer+N,R)+O+m(M)}else{y+=J(R.buffer+N+M,R)+O}}while(L>1){O=D[D.length-2].cN?"</span>":"";y+=O;L--;D.length--}var r=D[D.length-1];D.length--;D[D.length-1].buffer="";if(r.starts){I(r.starts,"")}return R.rE}if(w(M,R)){throw"Illegal"}}var E=e[B];var D=[E.dM];var A=0;var x=0;var y="";try{var s,u=0;E.dM.buffer="";do{s=p(C,u);var t=G(s[0],s[1],s[2]);u+=s[0].length;if(!t){u+=s[1].length}}while(!s[2]);if(D.length>1){throw"Illegal"}return{r:A,keyword_count:x,value:y}}catch(H){if(H=="Illegal"){return{r:0,keyword_count:0,value:m(C)}}else{throw H}}}function g(t){var p={keyword_count:0,r:0,value:m(t)};var r=p;for(var q in e){if(!e.hasOwnProperty(q)){continue}var s=d(q,t);s.language=q;if(s.keyword_count+s.r>r.keyword_count+r.r){r=s}if(s.keyword_count+s.r>p.keyword_count+p.r){r=p;p=s}}if(r.language){p.second_best=r}return p}function i(r,q,p){if(q){r=r.replace(/^((<[^>]+>|\t)+)/gm,function(t,w,v,u){return w.replace(/\t/g,q)})}if(p){r=r.replace(/\n/g,"<br>")}return r}function n(t,w,r){var x=h(t,r);var v=a(t);var y,s;if(v){y=d(v,x)}else{return}var q=c(t);if(q.length){s=document.createElement("pre");s.innerHTML=y.value;y.value=k(q,c(s),x)}y.value=i(y.value,w,r);var u=t.className;if(!u.match("(\\s|^)(language-)?"+v+"(\\s|$)")){u=u?(u+" "+v):v}if(/MSIE [678]/.test(navigator.userAgent)&&t.tagName=="CODE"&&t.parentNode.tagName=="PRE"){s=t.parentNode;var p=document.createElement("div");p.innerHTML="<pre><code>"+y.value+"</code></pre>";t=p.firstChild.firstChild;p.firstChild.cN=s.cN;s.parentNode.replaceChild(p.firstChild,s)}else{t.innerHTML=y.value}t.className=u;t.result={language:v,kw:y.keyword_count,re:y.r};if(y.second_best){t.second_best={language:y.second_best.language,kw:y.second_best.keyword_count,re:y.second_best.r}}}function o(){if(o.called){return}o.called=true;var r=document.getElementsByTagName("pre");for(var p=0;p<r.length;p++){var q=b(r[p]);if(q){n(q,hljs.tabReplace)}}}function l(){if(window.addEventListener){window.addEventListener("DOMContentLoaded",o,false);window.addEventListener("load",o,false)}else{if(window.attachEvent){window.attachEvent("onload",o)}else{window.onload=o}}}var e={};this.LANGUAGES=e;this.highlight=d;this.highlightAuto=g;this.fixMarkup=i;this.highlightBlock=n;this.initHighlighting=o;this.initHighlightingOnLoad=l;this.IR="[a-zA-Z][a-zA-Z0-9_]*";this.UIR="[a-zA-Z_][a-zA-Z0-9_]*";this.NR="\\b\\d+(\\.\\d+)?";this.CNR="\\b(0[xX][a-fA-F0-9]+|(\\d+(\\.\\d*)?|\\.\\d+)([eE][-+]?\\d+)?)";this.BNR="\\b(0b[01]+)";this.RSR="!|!=|!==|%|%=|&|&&|&=|\\*|\\*=|\\+|\\+=|,|\\.|-|-=|/|/=|:|;|<|<<|<<=|<=|=|==|===|>|>=|>>|>>=|>>>|>>>=|\\?|\\[|\\{|\\(|\\^|\\^=|\\||\\|=|\\|\\||~";this.ER="(?![\\s\\S])";this.BE={b:"\\\\.",r:0};this.ASM={cN:"string",b:"'",e:"'",i:"\\n",c:[this.BE],r:0};this.QSM={cN:"string",b:'"',e:'"',i:"\\n",c:[this.BE],r:0};this.CLCM={cN:"comment",b:"//",e:"$"};this.CBLCLM={cN:"comment",b:"/\\*",e:"\\*/"};this.HCM={cN:"comment",b:"#",e:"$"};this.NM={cN:"number",b:this.NR,r:0};this.CNM={cN:"number",b:this.CNR,r:0};this.BNM={cN:"number",b:this.BNR,r:0};this.inherit=function(r,s){var p={};for(var q in r){p[q]=r[q]}if(s){for(var q in s){p[q]=s[q]}}return p}}();hljs.LANGUAGES.cpp=function(){var a={keyword:{"false":1,"int":1,"float":1,"while":1,"private":1,"char":1,"catch":1,"export":1,virtual:1,operator:2,sizeof:2,dynamic_cast:2,typedef:2,const_cast:2,"const":1,struct:1,"for":1,static_cast:2,union:1,namespace:1,unsigned:1,"long":1,"throw":1,"volatile":2,"static":1,"protected":1,bool:1,template:1,mutable:1,"if":1,"public":1,friend:2,"do":1,"return":1,"goto":1,auto:1,"void":2,"enum":1,"else":1,"break":1,"new":1,extern:1,using:1,"true":1,"class":1,asm:1,"case":1,typeid:1,"short":1,reinterpret_cast:2,"default":1,"double":1,register:1,explicit:1,signed:1,typename:1,"try":1,"this":1,"switch":1,"continue":1,wchar_t:1,inline:1,"delete":1,alignof:1,char16_t:1,char32_t:1,constexpr:1,decltype:1,noexcept:1,nullptr:1,static_assert:1,thread_local:1,restrict:1,_Bool:1,complex:1},built_in:{std:1,string:1,cin:1,cout:1,cerr:1,clog:1,stringstream:1,istringstream:1,ostringstream:1,auto_ptr:1,deque:1,list:1,queue:1,stack:1,vector:1,map:1,set:1,bitset:1,multiset:1,multimap:1,unordered_set:1,unordered_map:1,unordered_multiset:1,unordered_multimap:1,array:1,shared_ptr:1}};return{dM:{k:a,i:"</",c:[hljs.CLCM,hljs.CBLCLM,hljs.QSM,{cN:"string",b:"'\\\\?.",e:"'",i:"."},{cN:"number",b:"\\b(\\d+(\\.\\d*)?|\\.\\d+)(u|U|l|L|ul|UL|f|F)"},hljs.CNM,{cN:"preprocessor",b:"#",e:"$"},{cN:"stl_container",b:"\\b(deque|list|queue|stack|vector|map|set|bitset|multiset|multimap|unordered_map|unordered_set|unordered_multiset|unordered_multimap|array)\\s*<",e:">",k:a,r:10,c:["self"]}]}}}();hljs.LANGUAGES.r={dM:{c:[hljs.HCM,{cN:"number",b:"\\b0[xX][0-9a-fA-F]+[Li]?\\b",e:hljs.IMMEDIATE_RE,r:0},{cN:"number",b:"\\b\\d+(?:[eE][+\\-]?\\d*)?L\\b",e:hljs.IMMEDIATE_RE,r:0},{cN:"number",b:"\\b\\d+\\.(?!\\d)(?:i\\b)?",e:hljs.IMMEDIATE_RE,r:1},{cN:"number",b:"\\b\\d+(?:\\.\\d*)?(?:[eE][+\\-]?\\d*)?i?\\b",e:hljs.IMMEDIATE_RE,r:0},{cN:"number",b:"\\.\\d+(?:[eE][+\\-]?\\d*)?i?\\b",e:hljs.IMMEDIATE_RE,r:1},{cN:"keyword",b:"(?:tryCatch|library|setGeneric|setGroupGeneric)\\b",e:hljs.IMMEDIATE_RE,r:10},{cN:"keyword",b:"\\.\\.\\.",e:hljs.IMMEDIATE_RE,r:10},{cN:"keyword",b:"\\.\\.\\d+(?![\\w.])",e:hljs.IMMEDIATE_RE,r:10},{cN:"keyword",b:"\\b(?:function)",e:hljs.IMMEDIATE_RE,r:2},{cN:"keyword",b:"(?:if|in|break|next|repeat|else|for|return|switch|while|try|stop|warning|require|attach|detach|source|setMethod|setClass)\\b",e:hljs.IMMEDIATE_RE,r:1},{cN:"literal",b:"(?:NA|NA_integer_|NA_real_|NA_character_|NA_complex_)\\b",e:hljs.IMMEDIATE_RE,r:10},{cN:"literal",b:"(?:NULL|TRUE|FALSE|T|F|Inf|NaN)\\b",e:hljs.IMMEDIATE_RE,r:1},{cN:"identifier",b:"[a-zA-Z.][a-zA-Z0-9._]*\\b",e:hljs.IMMEDIATE_RE,r:0},{cN:"operator",b:"<\\-(?!\\s*\\d)",e:hljs.IMMEDIATE_RE,r:2},{cN:"operator",b:"\\->|<\\-",e:hljs.IMMEDIATE_RE,r:1},{cN:"operator",b:"%%|~",e:hljs.IMMEDIATE_RE},{cN:"operator",b:">=|<=|==|!=|\\|\\||&&|=|\\+|\\-|\\*|/|\\^|>|<|!|&|\\||\\$|:",e:hljs.IMMEDIATE_RE,r:0},{cN:"operator",b:"%",e:"%",i:"\\n",r:1},{cN:"identifier",b:"`",e:"`",r:0},{cN:"string",b:'"',e:'"',c:[hljs.BE],r:0},{cN:"string",b:"'",e:"'",c:[hljs.BE],r:0},{cN:"paren",b:"[[({\\])}]",e:hljs.IMMEDIATE_RE,r:0}]}};
hljs.initHighlightingOnLoad();
</script>
</head>
<body>
<h1>Mapreduce in R</h1>
<h2>My first mapreduce job</h2>
<p>Conceptually, mapreduce is not very different than a combination of <code>lapply</code>s and a <code>tapply</code>: transform elements of a list, compute an index &mdash; key in mapreduce jargon &mdash; and process the groups thus defined. Let&#39;s start with a simple <code>lapply</code> example:</p>
<pre><code class="r"> small.ints = 1:1000
sapply(small.ints, function(x) x^2)
</code></pre>
<p>The example is trivial, just computing the first 10 squares, but we just want to get the basics here, there are interesting examples later on. Now to the mapreduce equivalent:</p>
<pre><code class="r"> small.ints = to.dfs(1:1000)
mapreduce(input = small.ints, map = function(k,v) cbind(v,v^2))
</code></pre>
<p>This is all it takes to write your first mapreduce job in <code>rmr</code>. There are some differences that we will review, but the first thing to notice is that it isn&#39;t all that different, and just two lines of code. The first line puts the data into HDFS, where the bulk of the data has to reside for mapreduce to operate on. It is not possible to write out big data with <code>to.dfs</code>, not in a scalable way. <code>to.dfs</code> is nonetheless very useful for a variety of uses like writing test cases, learning and debugging. <code>to.dfs</code> can put the data in a file of your own choosing, but if you don&#39;t specify one it will create temp files and clean them up when done. The return value is something we call a <em>big data object</em>. You can assign it to variables, pass it to other <code>rmr</code> functions, mapreduce jobs or read it back in. It is a stub, that is the data is not in memory, only some information that helps finding and managing the data. This way you can refer to very large data sets whose size exceeds memory limits. </p>
<p>Now onto the second line. It has <code>mapreduce</code> replace <code>lapply</code>. We prefer named arguments with <code>mapreduce</code> because there&#39;s quite a few possible arguments, but it&#39;s not mandatory. The input is the variable <code>small.ints</code> which contains the output of <code>to.dfs</code>, that is a stub for our small number data set in its HDFS version, but it could be a file path or a list containing a mix of both. The function to apply, which is called a map function as opposed to the reduce function, which we are not using here, is a regular R function with a few constraints:</p>
<ol>
<li>It&#39;s a function of two arguments, a collection of keys and one of values</li>
<li>It returns key value pairs using the function <code>keyval</code>, which can have vectors, lists, matrices or data.frames as arguments; you can also return <code>NULL</code>. You can avoid calling <code>keyval</code> explicitly but the return value <code>x</code> will be converted with a call to <code>keyval(NULL,x)</code>. This is not allowed in the map function when the reduce function is specified and under no circumstance in the combine function, since specifying the key is necessary for the shuffle phase.</li>
</ol>
<p>In this example, we are not using the keys at all, only the values, but we still need both to support the general mapreduce case. The return value is big data object, and you can pass it as input to other jobs or read it into memory (watch out, it will fail for big data!) with <code>from.dfs</code>. <code>from.dfs</code> is complementary to <code>to.dfs</code> and returns a key-value pair collection. <code>from.dfs</code> is useful in defining map reduce algorithms whenever a mapreduce job produces something of reasonable size, like a summary, that can fit in memory and needs to be inspected to decide on the next steps, or to visualize it. It is much more important than <code>to.dfs</code> in production work.</p>
<h2>My second mapreduce job</h2>
<p>We&#39;ve just created a simple job that was logically equivalent to a <code>lapply</code> but can run on big data. That job had only a map. Now to the reduce part. The closest equivalent in R is arguably a <code>tapply</code>. So here is the example from the R docs:</p>
<pre><code class="r"> groups = rbinom(32, n = 50, prob = 0.4)
tapply(groups, groups, length)
</code></pre>
<p>This creates a sample from the binomial and counts how many times each outcome occurred. Now onto the mapreduce equivalent:</p>
<pre><code class="r"> groups = to.dfs(groups)
from.dfs(mapreduce(input = groups, map = function(k,v) keyval(v, 1), reduce = function(k,vv) keyval(k, length(vv))))
</code></pre>
<p>First we move the data into HDFS with <code>to.dfs</code>. As we said earlier, this is not the normal way in which big data will enter HDFS; it is normally the responsibility of scalable data collection systems such as Flume or Sqoop. In that case we would just specify the HDFS path to the data as input to <code>mapreduce</code>. But in this case the input is the variable <code>groups</code> which contains a big data object, which keeps track of where the data is and does the clean up when the data is no longer needed. Since a map function is not specified it is set to the default, which is like an identity but consistent with the map requirements, that is <code>function(k,v) keyval(k,v)</code>. The reduce function takes two arguments, one is a key and the other is a collection of all the values associated with that key. It could be one of vector, list, data frame or matrix depending on what was returned by the map function. The idea is that if the user returned values of one class, we should preserve that through the shuffle phase. Like in the map case, the reduce function can return <code>NULL</code>, a key-value pair as generated by the function <code>keyval</code> or any other object <code>x</code> which is equivalent to <code>keyval(NULL, x)</code>. The default is no reduce, that is the output of the map is the output of mapreduce. In this case the keys are realizations of the binomial and the values are all <code>1</code> (please note recycling in action) and the only important thing is how many there are, so <code>length</code> gets the job done. Looking back at this second example, there are some small differences with <code>tapply</code> but the overall complexity is very similar.</p>
<h2>Word count</h2>
<p>The word count program has become a sort of &ldquo;hello world&rdquo; of the mapreduce world. For a review of how the same task can be accomplished in several languages, but always for mapreduce, see this <a href="http://blog.piccolboni.info/2011/04/looking-for-map-reduce-language.html">blog entry</a>.</p>
<p>We are defining a function, <code>wordcount</code>, that encapsulates this job. This may not look like a big deal but it is important. Our main goal was not simply to make it easy to run a mapreduce job but to make mapreduce jobs first class citizens of the R environment and to make it easy to create abstractions based on them. For instance, we wanted to be able to assign the result of a mapreduce job to a variable &mdash; and I mean <em>the result</em>, not some error code or diagnostics &mdash; and to create complex expressions including mapreduce jobs. We take the first step here by creating a function that is itself a job, can be chained with other jobs, executed in a loop etc. </p>
<p>Let&#39;s now look at the signature. </p>
<pre><code class="r">wordcount =
function (input, output = NULL, pattern = &quot; &quot;) {
</code></pre>
<p>There is an input and optional output and a pattern that defines what a word is. </p>
<p>The map function, as we know already, takes two arguments, a key and a value. The key here is not important, indeed always <code>NULL</code>. The value contains several lines of text, which gets split according to a pattern. Here you can see that <code>pattern</code> is accessible in the mapper without any particular work on the programmer side and according to normal R scope rules. This apparent simplicity hides the fact that the map function is executed in a different interpreter and on a different machine than the <code>mapreduce</code> function. Behind the scenes the R environment is serialized, broadcast to the cluster and restored on each interpreter running on the nodes. For each word, a key value pair (<em>w</em>, 1) is generated with <code>keyval</code> and their collection is the return value of the mapper. </p>
<pre><code class="r"> wc.reduce =
function(word, counts ) {
keyval(word, sum(counts))}
</code></pre>
<p>The reduce function takes a key and a collection of values, in this case a numeric vector, as input and simply sums up all the counts and returns the pair word, count using the same helper function, <code>keyval</code>. Finally, specifying the use of a combiner is necessary to guarantee the scalability of this algorithm. Now on to the mapreduce call.</p>
<pre><code class="r"> mapreduce(input = input ,
output = output,
input.format = &quot;text&quot;,
map = wc.map,
reduce = wc.reduce,
combine = T)}
</code></pre>
<p>The implementation defines map and reduce functions and then makes a single call to <code>mapreduce</code>. The map and reduce functions could be as well anonymous functions as they are used only once, but there is one advantage in naming them. <code>rmr</code> offers alternate backends, in particular one can switch off Hadoop altogether with <code>rmr.options(backend = &quot;local&quot;)</code>. While this is of no use for production work, as it offers no scalability, it is an amazing resource for learning and debugging as we are dealing with a local, run of the mill R program with the same semantics as when run on top of Hadoop. This, in combination with using named map and reduce functions, allows us to use <code>debug</code> to debug mapper and reducer the familiar way. </p>
<p>The input can be an HDFS path, the return value of <code>to.dfs</code> or another job or a list thereof &mdash; potentially, a mix of all three cases, as in <code>list(&quot;a/long/path&quot;, to.dfs(...), mapreduce(...), ...)</code>. The output can be an HDFS path but if it is <code>NULL</code> some temporary file will be generated and wrapped in a big data object, like the ones generated by <code>to.dfs</code>. In either event, the job will return the information about the output, either the path or the big data object. Therefore, we simply pass along the input and output of the<code>wordcount</code> function to the <code>mapreduce</code> call and return whatever its return value. That way the new function also behaves like a proper mapreduce job &mdash; more details <a href="Writing-composable-mapreduce-jobs">here</a>. The <code>input.format</code> argument allows us to specify the format of the input. The default is based on R&#39;s own serialization functions and supports all R data types. In this case we just want to read a text file, so the <code>&quot;text&quot;</code> format will create key value pairs with a <code>NULL</code>key and a line of text as value. You can easily specify your own input and output formats and even accept and produce binary formats with the functions <code>make.input.format</code> and <code>make.output.format</code>. </p>
<h2>Logistic Regression</h2>
<p>Now on to an example from supervised learning, specifically logistic regression by gradient descent. Again we are going to create a function that encapsulates this algorithm. </p>
<pre><code class="r">logistic.regression = function(input, iterations, dims, alpha){
</code></pre>
<p>As you can see we have an input representing the training data. For simplicity we ask to specify a fixed number of iterations, but it would be only slightly more difficult to implement a convergence criterion. Then we need to specify the dimension of the problem, which is redundant because it can be inferred after seeing the first line of input, but we didn&#39;t want to put additional logic in the map function, and then we have the learning rate <code>alpha</code>.</p>
<pre><code class="r"> plane = t(rep(0, dims))
g = function(z) 1/(1 + exp(-z))
for (i in 1:iterations) {
gradient =
values(
from.dfs(
mapreduce(
input,
map = lr.map,
reduce = lr.reduce,
combine = T)))
plane = plane + alpha * gradient }
plane }
</code></pre>
<p>We start by initializing the separating plane and defining the logistic function. As before, those variables will be used inside the map function, that is they will travel across interpreter and processor and network barriers to be available where the developer needs them and where a traditional, meaning sequential, R developer expects them to be available according to scope rules &mdash; no boilerplate code and familiar, powerful behavior.</p>
<p>Then we have the main loop where computing the gradient of the loss function is the duty of a map reduce job, whose output is brought into main memory with a call to <code>from.dfs</code> &mdash; any intermediate result files are managed by the system, not you. The only important thing you need to know is that the gradient is going to fit in memory so we can call <code>from.dfs</code> to get it without exceeding available resources.</p>
<pre><code class="r"> lr.map =
function(dummy, M) {
Y = M[,&#39;y&#39;]
X = M[,c(&quot;x1&quot;,&quot;x2&quot;)]
keyval(1,
Y * X * g(-Y * as.numeric(X %*% t(plane))))}
</code></pre>
<p>The map function simply computes the contribution of a subset of points to the gradient. Please note the variables <code>g</code> and <code>plane</code> making their necessary appearance here without any work on the developer&#39;s part. The access here is read only but you could even modify them if you wanted &mdash; the semantics is copy on assign, which is consistent with how R works and easily supported by Hadoop. Since in the next step we just want to add everything together, we return a dummy, constant key for each value. Note the use of recycling in <code>keyval</code>.</p>
<pre><code class="r"> lr.reduce =
function(k, Z) keyval(k, t(as.matrix(apply(Z,2,sum))))
</code></pre>
<p>The reduce function is just a big sum. Since we have only one key, all the work will fall on one reducer and that&#39;s not scalable. Therefore, in this example, it&#39;s important to activate the combiner, in this case set to TRUE, which means same as the reducer. Since sums are associative and commutative that&#39;s all we need. <code>mapreduce</code> also accepts a distinct combiner function. Remember that a combiner&#39;s arguments can come from a map or a combine function and its return value can go to a combine or reduce function.</p>
<p>To make this example production-level there are several things one needs to do, like having a convergence criterion instead of a fixed iterations number an an adaptive learning rate, but probably gradient descent just requires too many iterations to be the right approach in a big data context. But this example should give you all the elements to be able to implement, say, conjugate gradient instead. In general, when each iteration requires I/O of a large data set, the number of iterations needs to be modest and algorithms with O(log(N)) number of iterations are natural candidates, even if the work in each iteration may be more substantial.</p>
<h2>K-means</h2>
<p>We are now going to cover a simple but significant clustering algorithm and the complexity will go up just a little bit. To cheer yourself up, you can take a look at <a href="http://www.hortonworks.com/new-apache-pig-features-part-2-embedding/">this alternative implementation</a> which requires three languages, python, pig and java, to get the job done and is hailed as a model of simplicity.</p>
<p>We are talking about k-means. This is not a production ready implementation, but should be illustrative of the power of this package. You simply can not do this in pig or hive alone and it would take hundreds of lines of code in java.</p>
<pre><code class="r"> dist.fun =
function(C, P) {
apply(C,
1,
function(x) matrix(rowSums((t(t(P) - x))^2),
ncol = length(x)))}
</code></pre>
<p>This is simply a distance function, the only noteworthy property of which is that it can compute all the distance between a matrix of centers <code>C</code> and a matrix of points <code>P</code> very efficiently, on my laptop it can do 10<sup>6</sup> points and 10<sup>2</sup> centers in 5 dimensions in approx. 16s. The only explicit iteration is over the dimension, but all the other operations are vectorized (e.g. loops are pushed to the C library), hence the speed.</p>
<pre><code class="r"> kmeans.map.1 =
function(k, P) {
nearest =
if(is.null(C)) {
sample(1:num.clusters, nrow(P), replace = T)}
else {
D = dist.fun(C, P)
nearest = max.col(-D)}
keyval(nearest, P) }
</code></pre>
<p>The role of the map function is to compute distances between some points and all centers and return for each point the closest center. It has two flavors controlled by the main <code>if</code>: the first iteration when no candidate centers are available and all the following ones. Please note that while the points are stored in HDFS and provided to the map function as its second argument, the centers are simply stored in a matrix and available in the map function because of normal scope rules. In the first iteration, each point is randomly assigned to a center, whereas in the following ones a min distance criterion is used. Finally notice the vectorized use of keyval whereby all the center-point pairs are returned in one statement (the correspondence is positional, with the second dimension used when present).</p>
<pre><code class="r"> kmeans.reduce.1 =
function(x, P) {
t(as.matrix(apply(P,2,mean)))}
</code></pre>
<p>The reduce function couldn&#39;t be simpler as it just computes column averages of a matrix of points sharing a center, which is the key. If you wonder why the <code>.1</code> in the name of these two functions, it&#39;s because we plan to explore some optimizations in future installments.</p>
<pre><code class="r"> C = NULL
for(i in 1:num.iter ) {
C =
values(
from.dfs(
mapreduce(P, map = kmeans.map.1, reduce = kmeans.reduce.1)))
if(nrow(C) &lt; 5)
C = matrix(rnorm(num.clusters * nrow(C)), ncol = nrow(C)) %*% C }
C}
</code></pre>
<p>The main loop does nothing but bring into memory the result of a mapreduce job with the two above functions as mapper and reducer and the big data object with the points as input. Once the keys are discarded, the values form a matrix which become the new centers. The last two lines before the return value are a heuristic to keep the number of centers the desired one (when centers are nearest to no points, they are lost). To run this function we need some data:</p>
<pre><code class="r"> input =
do.call(rbind, rep(list(matrix(rnorm(10, sd = 10), ncol=2)), 20)) +
matrix(rnorm(200), ncol =2)
</code></pre>
<p>That is, create a large matrix with a few rows repeated many times and then add some noise. Finally, the function call:</p>
<pre><code class="r"> kmeans.mr(to.dfs(input), num.clusters = 12, num.iter= 5)
</code></pre>
<p>With a little extra work you can even get pretty visualizations like <a href="kmeans.gif">this one</a>.</p>
<h2>Linear Least Squares</h2>
<p>This is an example of a hybrid mapreduce-conventional solution to a well known problem. We will start with a mapreduce job that results in a smaller data set that can be brought into main memory and processed in a single R instance. This is straightforward in rmr because of the simple primitive that transfers data into memory, <code>from.dfs</code>, and the R-native data model. This is in contrast with hybrid pig-java-python solutions where mapping data types from one language to the other is a time-consuming and error-prone chore the developer has to deal with.</p>
<p>Specifically, we want to solve LLS under the assumption that we have too many data points to fit in memory but not such a huge number of variables that we need to implement the whole process as map reduce job. This is the basic equation we want to solve in the least square sense: </p>
<p><strong>X</strong> <strong>&beta;</strong> = <strong>y</strong></p>
<p>We are going to do it by using the function solve as in the following expression, that is solving the normal equations.</p>
<pre>
solve(t(X)%*%X, t(X)%*%y)
</pre>
<p>But let&#39;s assume that X is too big to fit in memory, so we have to compute the transpose and matrix products using map reduce, then we can do the solve as usual on the results. This is our general plan.</p>
<p>Let&#39;s get some data first, potentially big data matrix <code>X</code> and a regular vector <code>y</code>:</p>
<pre><code class="r">X = to.dfs(matrix(rnorm(2000), ncol = 10))
y = as.matrix(rnorm(200))
</code></pre>
<p>The next is a reusable reduce function that just sums a list of matrices, ignores the key.</p>
<pre><code class="r">Sum = function(k, YY) keyval(1, list(Reduce(&#39;+&#39;, YY)))
</code></pre>
<p>The big matrix is passed to the mapper in chunks of complete rows. Smaller cross-products are computed for these submatrices and passed on to a single reducer, which sums them together. Since we have a single key a combiner is mandatory and since matrix sum is associative and commutative we certainly can use it here.</p>
<pre><code class="r">XtX =
values(
from.dfs(
mapreduce(
input = X,
map =
function(k, Xi)
keyval(1, list(t(Xi) %*% Xi)),
reduce = Sum,
combine = TRUE)))[[1]]
</code></pre>
<p>The same pretty much goes on also for vector y, which is made available to the nodes according to normal scope rules.</p>
<pre><code class="r">Xty =
values(
from.dfs(
mapreduce(
input = X,
map = function(k, Xi)
keyval(1, list(t(Xi) %*% y)),
reduce = Sum,
combine = TRUE)))[[1]]
</code></pre>
<p>And finally we just need to call <code>solve</code>.</p>
<pre><code class="r">solve(XtX, Xty)
</code></pre>
<h3>Related Links</h3>
<ul>
<li><a href="https://github.com/RevolutionAnalytics/RHadoop/wiki/Comparison-of-high-level-languages-for-mapreduce%3A-k-means">Comparison of high level languages for mapreduce: k means</a></li>
<li><a href="https://github.com/RevolutionAnalytics/RHadoop/wiki/Changelog">Changelog</a></li>
<li><a href="https://github.com/RevolutionAnalytics/RHadoop/wiki/Design-philosophy">Design philosophy</a></li>
<li><a href="https://github.com/RevolutionAnalytics/RHadoop/wiki/Efficient-rmr-techniques">Efficient rmr techniques</a></li>
<li><a href="https://github.com/RevolutionAnalytics/RHadoop/wiki/Writing-composable-mapreduce-jobs">Writing composable mapreduce jobs</a></li>
<li><a href="https://github.com/RevolutionAnalytics/RHadoop/wiki/Use-cases">Use cases</a></li>
<li><a href="https://github.com/RevolutionAnalytics/RHadoop/wiki/Getting-data-in-and-out">Getting data in and out</a></li>
<li><a href="https://github.com/RevolutionAnalytics/RHadoop/wiki/FAQ">FAQ</a></li>
</ul>
</body>
</html>
Jump to Line
Something went wrong with that request. Please try again.