Skip to content

Example R protocol use with RHadoop

ChappedSky edited this page Apr 17, 2013 · 1 revision

Drake can be particularly useful for orchestrating data-science tasks you may want to do in R. R has good support for doing repeatable analysis, reporting and integrates well with hadoop. This tutorial will show a simple example of using drake to structure the process of getting some web data, formatting it with R, generating a simple report with R, then passing the data to RHadoop for other analysis.

First, generate your Drakefile with the step to get some data

Drakefile:

;
; This Drake workflow allows users to search freebase for entities by name. Then summarizes the types of objects which the resulting entities are. 
;

search_term="drake" 
;This step fetches the freebase search result based on your search term
$[search_term].json <- 
    echo 'https://www.googleapis.com/freebase/v1/search?query=$[search_term]&output=(type)' > tmp_url
    cat tmp_url
    wget -i tmp_url -O $OUTPUT

To run only this step, run

drake =$search_term

which should give you a json file with the following structure:

{"status":"200 OK","result":[{
    "mid":"/m/034ks",
    "id":"/en/galileo_galilei",
    "name":"Galileo Galilei",
    "notable":{
        "name":"Physicist",
        "id":"/m/05snw"},
    "lang":"en",
    "score":112.919083,
    "output":{
        "type":{
            "/type/object/type":[{
                "id":"/book/author",
                "mid":"/m/01xryvt",
                "name":"Author"}]
                }
            }
            }]}

Next, we use drake's R protocol to take the json result from freebase to generate a data frame with the names and types of each result.

search_data.csv <- $[search_term].json [R]
     if(!require(RJSONIO)){
         install.packages("RJSONIO")
     }
     if(!require(plyr)){
         install.packages("plyr")
     }
     search_result <- fromJSON('$[INPUT]',simplify=FALSE)
     search_types <- ldply(search_result$result,function(x){
         data.frame(name=x$name,
                   type=x$output$type[[1]][[1]]$name)
         })
     write.csv(search_types,file='$[OUTPUT]',row.names=FALSE)

The output, search_data.csv will then give you data frame with the entities and their tyes that looks like this :

"name","type"
"Galileo Galilei","Author"
"Drake","Musical Artist"
"Drake University","Organization"
"Iceman","Fictional Character"

Once we've formatted our data, we want to do some super important reporting. Since in this case we're just reporting to stdout, we're naming with a tag instead of a file

%report_output_types <- search_data.csv [R]
    input_data <- read.csv("$[INPUT]")
    print(table(input_data$type))

One of the biggest advantages of using R is the excellent RHadoop package which allows R programmers to easily create streaming mapreduce jobs from within the context of R. The following is a simple example of a wordcount on the data we've already collected.

First, put the data into hadoop

;example integration of RHadoop
hdfs:/data/drake_data <- drake_data
    hadoop fs -put $INPUT $OUTPUT

Then, we create a simple wordcount map reduce job with RHadoop

hdfs/data/wordcount_mr_result <- hdfs:/data/drake_data [R]
    library(rmr2)
    mapreduce(
        input='$[INPUT]',
        output='$[OUTPUT]',
        input.format="text",
        map=function(k,v){
            keyval(key=unlist(strsplit(v,split = ",")),val=1)
        },
        reduce=function(k,v){
            keyval(key=k,val=sum(unlist(v)))
        }
    )

Run Drake:

$ drake

And here's the entire file again:

search_term="drake" 


$[search_term].json <- 
    echo 'https://www.googleapis.com/freebase/v1/search?query=$[search_term]&output=(type)' > tmp_url
    cat tmp_url
    wget -i tmp_url -O $OUTPUT

search_data.csv <- $[search_term].json [R]
     if(!require(RJSONIO)){
         install.packages("RJSONIO")
     }
     if(!require(plyr)){
         install.packages("plyr")
     }
     search_result <- fromJSON('$[INPUT]',simplify=FALSE)
     search_types <- ldply(search_result$result,function(x){
         data.frame(name=x$name,
                   type=x$output$type[[1]][[1]]$name)
         })
     write.csv(search_types,file='$[OUTPUT]',row.names=FALSE)


%report_output_types <- search_data.csv [R]
    input_data <- read.csv("$[INPUT]")
    print(table(input_data$type))


;example integration of RHadoop
hdfs:/data/drake_data <- drake_data
    hadoop fs -put $INPUT $OUTPUT

;wordcount of items in result set
hdfs/data/wordcount_mr_result <- hdfs:/data/drake_data [R]
    library(rmr2)
    mapreduce(
        input='$[INPUT]',
        output='$[OUTPUT]',
        input.format="text",
        map=function(k,v){
            keyval(key=unlist(strsplit(v,split = ",")),val=1)
        },
        reduce=function(k,v){
            keyval(key=k,val=sum(unlist(v)))
        }
    )