Skip to content

Commit

Permalink
all chunk externalized, now need to bring up to 1.3
Browse files Browse the repository at this point in the history
  • Loading branch information
piccolbo committed Jul 18, 2012
1 parent 75a5452 commit 5ffbba1
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 163 deletions.
75 changes: 15 additions & 60 deletions rmr/pkg/docs/getting-data-in-and-out.Rmd
@@ -1,4 +1,5 @@
`r read_chunk('../tests/getting-data-in-and-out.R')`
`r read_chunk('../tests/wordcount.R')`
`r opts_chunk$set(echo=TRUE, eval=FALSE, cache=FALSE, tidy=FALSE)`
```{r cache=FALSE, eval=TRUE, echo=FALSE, results='hide', message=FALSE}
library(rmr)
Expand Down Expand Up @@ -34,117 +35,71 @@ A format is a triple. You can create one with `make.input.format`, for instance:

The `mode` element can be `text` or `binary`. The `format` element is a function that takes a connection, reads `nrows` records and creates a key-value pair. The `streaming.format` element is a fully qualified Java class (as a string) that writes to the connection the format function reads from. The default is `TextInputFormat` and also useful is `org.apache.hadoop.streaming.AutoInputFormat`. Once you have these three elements you can pass them to `make.input.format` and get something out that can be used as the `input.format` option to `mapreduce` and the `format` option to `from.dfs`. On the output side the situation is reversed with the R function acting first and then the Java class doing its thing.

```{r getting-data.make.output.format.csv}
```{r getting-data.make.output.format.csv, echo = TRUE, eval=TRUE, comment="", cache=FALSE}
```

R data types natively work without additional effort (for matrices, functions, models, promises it is true from v1.2, hopefully all bases covered now)

```{r getting.data.generic.list, echo=TRUE}
```{r getting.data.generic.list}
```

Put into HDFS:
```r
hdfs.data <- to.dfs(my.data)
```{r getting-data.put.into.dfs}
```
`my.data` is coerced to a list and each element of a list becomes a record.

Compute a frequency of object lengths. Only require input, mapper, and reducer. Note that `my.data` is passed into the mapper, record by
record, as `key = NULL, value = item`.

```r
result <- mapreduce(input = hdfs.data,
map = function(k,v) keyval(length(v), 1),
reduce = function(k,vv) keyval(k, sum(unlist(vv)))
)

from.dfs(result)
```{r getting-data.object.length.frequency}
```

However, if using data which was not generated with `rmr` (txt, csv, tsv, JSON, log files, etc) it is necessary to specify an input format.

There is a third option in between the simplicity of a string like "csv" and the full power of `make.input.format`, which is passing the format string to `make.input.format` with additional arguments that further specify the specific dialect of `csv`, as in `make.input.format("csv", sep = ';')`. `csv` is the only format offering this possibility as the others are fully specified and it takes the same options as `read.table`. The same on the output side with `write.table` being the model.

[Wordcount](https://github.com/RevolutionAnalytics/RHadoop/blob/master/rmr/pkg/tests/wordcount.R): please note the use of `input.format = "text"`.
```{r wordcount}
```

To define your own `input.format` (e.g. to handle tsv):


<hr>
### under revision from here to end

```r
myTSVReader <- function(line){
delim <- strsplit(line, split = "\t")[[1]]
keyval(delim[[1]], delim[-1]) # first column is the key, note that column indexes moved by 1
}
```{r getting-data.tsv.reader}
```

Frequency count on input column two of the tsv data, data comes into map already delimited

```r
mrResult <- mapreduce(input = hdfsData,
textinputformat = myTSVReader,
map = function(k,v) keyval(v[[1]], 1),
reduce = function(k,vv) sapply(vv, sum(unlist(vv))
)
```{r getting-data.frequency.count}
```

Or if you want named columns, this would be specific to your data file

```r
mySpecificTSVReader <- function(line){
delim <- strsplit(line, split = "\t")[[1]]
keyval(delim[[1]], list(location = delim[[2]], name = delim[[3]], value = delim[[4]]))
}
```{r getting-data.named.columns}
```

You can then use the list names to directly access your column of interest for manipulations
```r
mrResult <- mapreduce(input = hdfsData,
textinputformat = mySpecificTSVReader,
map = function(k, v) {
if (v$name == "blarg"){
keyval(k, log(v$value))
}
},
reduce = function(k, vv) keyval(k, mean(unlist(vv)))
)
```{r getting-data.named.column.access}
```

To get your data out - say you input file, apply column transformations, add columns, and want to output a new csv file
Just like textinputformat -must define a textoutputformat

```r
myCSVOutput <- function(k, v){
keyval(paste(k, paste(v, collapse = ","), sep = ","))
}
```{r getting-data.csv.output}
```

In v1.1 this should be as simple as

```r
myCSVOutput = csvtextoutputformat(sep = ",")
```{r getting-data.csv.output.simpler}
```

This time providing output argument so one can extract from hdfs (cannot hdfs.get from a Rhadoop big data object)

```r
mapreduce(input = hdfsData,
output = "/rhadoop/output/",
textoutputformat = myCSVOutput,
map = function(k,v){
# complicated function here
},
reduce = function(k,v) {
#complicated function here
}
)
```{r getting-data.explicit.output.arg}
```

Save output to the local filesystem

```r
hdfs.get("/rhadoop/output/", "/home/rhadoop/filesystemoutput/")
```{r getting-data.save.output}
```

Within /home/rhadoop/filesystemoutput/ will now be your CSV data (likely split into multiple part- files according to the Hadoop way).
95 changes: 47 additions & 48 deletions rmr/pkg/docs/getting-data-in-and-out.html
Expand Up @@ -226,7 +226,7 @@ <h2>Custom formats</h2>
NULL
else keyval(NULL, df, vectorized = nrecs &gt; 1)
}
&lt;environment: 0x104a22bf8&gt;
&lt;environment: 0x104865a60&gt;

$streaming.format
NULL
Expand All @@ -235,39 +235,35 @@ <h2>Custom formats</h2>

<p>The <code>mode</code> element can be <code>text</code> or <code>binary</code>. The <code>format</code> element is a function that takes a connection, reads <code>nrows</code> records and creates a key-value pair. The <code>streaming.format</code> element is a fully qualified Java class (as a string) that writes to the connection the format function reads from. The default is <code>TextInputFormat</code> and also useful is <code>org.apache.hadoop.streaming.AutoInputFormat</code>. Once you have these three elements you can pass them to <code>make.input.format</code> and get something out that can be used as the <code>input.format</code> option to <code>mapreduce</code> and the <code>format</code> option to <code>from.dfs</code>. On the output side the situation is reversed with the R function acting first and then the Java class doing its thing.</p>

<pre><code class="r">&gt; make.output.format(&quot;csv&quot;)
$mode
<pre><code class="r">make.output.format(&quot;csv&quot;)
</code></pre>

<pre><code>$mode
[1] &quot;text&quot;

$format
function (k, v, con, vectorized)
write.table(file = con, x = if (is.null(k)) v else cbind(k, v),
..., row.names = FALSE, col.names = FALSE)
&lt;environment: 0x7fed0fd672d0&gt;
&lt;environment: 0x102e5ba70&gt;

$streaming.format
NULL

</code></pre>

<p>R data types natively work without additional effort (for matrices, functions, models, promises it is true from v1.2, hopefully all bases covered now)</p>

<pre><code class="r">my.data &lt;- list(TRUE, list(&quot;nested list&quot;, 7.2), seq(1:3), letters[1:4], matrix(1:25, nrow = 5,ncol = 5))
</code></pre>

<p>Put into HDFS:</p>

<pre><code class="r">hdfs.data &lt;- to.dfs(my.data)
</code></pre>

<p><code>my.data</code> is coerced to a list and each element of a list becomes a record.</p>

<p>Compute a frequency of object lengths. Only require input, mapper, and reducer. Note that <code>my.data</code> is passed into the mapper, record by<br/>
record, as <code>key = NULL, value = item</code>. </p>

<pre><code class="r">result &lt;- mapreduce(input = hdfs.data,
map = function(k,v) keyval(length(v), 1),
reduce = function(k,vv) keyval(k, sum(unlist(vv)))
)
map = function(k,v) keyval(length(v), 1),
reduce = function(k,vv) keyval(k, sum(unlist(vv))))

from.dfs(result)
</code></pre>
Expand All @@ -276,56 +272,60 @@ <h2>Custom formats</h2>

<p>There is a third option in between the simplicity of a string like &ldquo;csv&rdquo; and the full power of <code>make.input.format</code>, which is passing the format string to <code>make.input.format</code> with additional arguments that further specify the specific dialect of <code>csv</code>, as in <code>make.input.format(&quot;csv&quot;, sep = &#39;;&#39;)</code>. <code>csv</code> is the only format offering this possibility as the others are fully specified and it takes the same options as <code>read.table</code>. The same on the output side with <code>write.table</code> being the model.</p>

<p><a href="https://github.com/RevolutionAnalytics/RHadoop/blob/master/rmr/pkg/tests/wordcount.R">Wordcount</a>: please note the use of <code>input.format = &quot;text&quot;</code>.</p>
<pre><code class="r">wordcount = function (input, output = NULL, pattern = &quot; &quot;) {
mapreduce(input = input ,
output = output,
input.format = &quot;text&quot;,
map = function(k,v) {
lapply(
strsplit(
x = v,
split = pattern)[[1]],
function(w) keyval(w,1))},
reduce = function(k,vv) {
keyval(k, sum(unlist(vv)))},
combine = T)}
</code></pre>

<p>To define your own <code>input.format</code> (e.g. to handle tsv):</p>

<hr>

<h3>under revision from here to end</h3>

<pre><code class="r">myTSVReader &lt;- function(line){
delim &lt;- strsplit(line, split = &quot;\t&quot;)[[1]]
keyval(delim[[1]], delim[-1]) # first column is the key, note that column indexes moved by 1
}
delim &lt;- strsplit(line, split = &quot;\t&quot;)[[1]]
keyval(delim[[1]], delim[-1])} # first column is the key, note that column indexes moved by 1
</code></pre>

<p>Frequency count on input column two of the tsv data, data comes into map already delimited</p>

<pre><code class="r">mrResult &lt;- mapreduce(input = hdfsData,
textinputformat = myTSVReader,
map = function(k,v) keyval(v[[1]], 1),
reduce = function(k,vv) sapply(vv, sum(unlist(vv))
)
textinputformat = myTSVReader,
map = function(k,v) keyval(v[[1]], 1),
reduce = function(k,vv) sapply(vv, sum(unlist(vv)))
</code></pre>

<p>Or if you want named columns, this would be specific to your data file</p>

<pre><code class="r">mySpecificTSVReader &lt;- function(line){
delim &lt;- strsplit(line, split = &quot;\t&quot;)[[1]]
keyval(delim[[1]], list(location = delim[[2]], name = delim[[3]], value = delim[[4]]))
}
delim &lt;- strsplit(line, split = &quot;\t&quot;)[[1]]
keyval(delim[[1]], list(location = delim[[2]], name = delim[[3]], value = delim[[4]]))}
</code></pre>

<p>You can then use the list names to directly access your column of interest for manipulations</p>

<pre><code class="r">mrResult &lt;- mapreduce(input = hdfsData,
textinputformat = mySpecificTSVReader,
map = function(k, v) {
if (v$name == &quot;blarg&quot;){
keyval(k, log(v$value))
}
},
reduce = function(k, vv) keyval(k, mean(unlist(vv)))
)
<pre><code class="r"> mrResult &lt;- mapreduce(input = hdfsData,
textinputformat = mySpecificTSVReader,
map = function(k, v) {
if (v$name == &quot;blarg&quot;){
keyval(k, log(v$value))
}
},
reduce = function(k, vv) keyval(k, mean(unlist(vv))))
</code></pre>

<p>To get your data out - say you input file, apply column transformations, add columns, and want to output a new csv file<br/>
Just like textinputformat -must define a textoutputformat</p>

<pre><code class="r">myCSVOutput &lt;- function(k, v){
keyval(paste(k, paste(v, collapse = &quot;,&quot;), sep = &quot;,&quot;))
}
keyval(paste(k, paste(v, collapse = &quot;,&quot;), sep = &quot;,&quot;))}
</code></pre>

<p>In v1.1 this should be as simple as</p>
Expand All @@ -336,15 +336,14 @@ <h3>under revision from here to end</h3>
<p>This time providing output argument so one can extract from hdfs (cannot hdfs.get from a Rhadoop big data object)</p>

<pre><code class="r">mapreduce(input = hdfsData,
output = &quot;/rhadoop/output/&quot;,
textoutputformat = myCSVOutput,
map = function(k,v){
# complicated function here
},
reduce = function(k,v) {
#complicated function here
}
)
output = &quot;/rhadoop/output/&quot;,
textoutputformat = myCSVOutput,
map = function(k,v){
# complicated function here
},
reduce = function(k,v) {
#complicated function here
})
</code></pre>

<p>Save output to the local filesystem</p>
Expand Down

0 comments on commit 5ffbba1

Please sign in to comment.