Skip to content

Commit

Permalink
Code for RHadoop random forests
Browse files Browse the repository at this point in the history
  • Loading branch information
laserson committed Feb 20, 2013
1 parent 91e6741 commit eb1a3cd
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 0 deletions.
36 changes: 36 additions & 0 deletions src/debugging.R
@@ -0,0 +1,36 @@
a <- mapreduce(input="~/temp/training.small.csv",
input.format="text",
# output.format="text",
map=poisson.subsample,
reduce=fit.trees,
output=NULL)

b <- from.dfs(a)


training.data <- read.table("~/temp/training.small.csv",
header=FALSE,
sep=",",
quote="\"",
row.names=NULL,
col.names=column.names,
fill=TRUE,
na.strings=c("NA"),
colClasses=c(MachineID="NULL",
SalePrice="numeric",
YearMade="numeric",
MachineHoursCurrentMeter="numeric",
ageAtSale="numeric",
saleYear="numeric",
ModelCount="numeric",
MfgYear="numeric",
ModelID.x="factor",
ModelID.y="factor",
fiManufacturerID="factor",
datasource="factor",
auctioneerID="factor",
saledatenumeric="numeric",
saleDay="factor",
Stick_Length="numeric"))

a <- randomForest(formula=model.formula, data=training.data, na.action=na.roughfix, ntree=10)
120 changes: 120 additions & 0 deletions src/fitRandomForest.R
@@ -0,0 +1,120 @@
# Fits a Random Forest to the Bulldozer data via Hadoop

library(rmr2)
library(randomForest)

# PARAM
# A great advantage of RHadoop is that the R environment I'm defining here will be
# packaged and distributed to each mapper/reducer, so there is no need to mess with
# Hadoop configuration variables or distributed cache
frac.per.model <- 0.1
num.models <- 50

# here we manually set the schema for the input data
# printed with dput(names(training.data))
column.names <- c("MachineID", "SalePrice", "ModelID.x", "datasource", "auctioneerID",
"YearMade", "MachineHoursCurrentMeter", "UsageBand", "saledate",
"fiModelDesc.x", "fiBaseModel.x", "fiSecondaryDesc.x", "fiModelSeries.x",
"fiModelDescriptor.x", "ProductSize", "fiProductClassDesc.x",
"state", "ProductGroup.x", "ProductGroupDesc.x", "Drive_System",
"Enclosure", "Forks", "Pad_Type", "Ride_Control", "Stick", "Transmission",
"Turbocharged", "Blade_Extension", "Blade_Width", "Enclosure_Type",
"Engine_Horsepower", "Hydraulics", "Pushblock", "Ripper", "Scarifier",
"Tip_Control", "Tire_Size", "Coupler", "Coupler_System", "Grouser_Tracks",
"Hydraulics_Flow", "Track_Type", "Undercarriage_Pad_Width", "Stick_Length",
"Thumb", "Pattern_Changer", "Grouser_Type", "Backhoe_Mounting",
"Blade_Type", "Travel_Controls", "Differential_Type", "Steering_Controls",
"saledatenumeric", "ageAtSale", "saleYear", "saleMonth", "saleDay",
"saleWeekday", "MedianModelPrice", "ModelCount", "ModelID.y",
"fiModelDesc.y", "fiBaseModel.y", "fiSecondaryDesc.y", "fiModelSeries.y",
"fiModelDescriptor.y", "fiProductClassDesc.y", "ProductGroup.y",
"ProductGroupDesc.y", "MfgYear", "fiManufacturerID", "fiManufacturerDesc",
"PrimarySizeBasis", "PrimaryLower", "PrimaryUpper")

# here we pick the actual variables to use for building the model
# note that randomForest doesn't like missing data, so we'll just
# nix some of those variables
# TODO
model.formula <- SalePrice ~ datasource + auctioneerID + YearMade + saledatenumeric + ProductSize +
ProductGroupDesc.x + Enclosure + Hydraulics + ageAtSale + saleYear +
saleMonth + saleDay + saleWeekday + MedianModelPrice + ModelCount +
MfgYear
# target <- "SalePrice"
# predictors <- c("datasource", )

# here's a helper function to parse the input text data into a data frame
parse.raw <- function(raw) {
read.table(textConnection(raw),
header=FALSE,
sep=",",
quote="\"",
row.names=NULL,
col.names=column.names,
fill=TRUE,
na.strings=c("NA"),
colClasses=c(MachineID="NULL",
SalePrice="numeric",
YearMade="numeric",
MachineHoursCurrentMeter="numeric",
ageAtSale="numeric",
saleYear="numeric",
ModelCount="numeric",
MfgYear="numeric",
ModelID.x="factor",
ModelID.y="factor",
fiManufacturerID="factor",
datasource="factor",
auctioneerID="factor",
saledatenumeric="numeric",
saleDay="factor",
Stick_Length="numeric"))
}

# MAP function
poisson.subsample <- function(k, v) {
# parse data chunk into data frame
# raw is basically a chunk of a csv file
raw <- paste(v, sep="\n")
# convert to data.frame using read.table() in parse.raw()
input <- parse.raw(raw)

# this function is used to generate a sample from the current block of data
generate.sample <- function(i) {
# generate N Poisson variables
draws <- rpois(n=nrow(input), lambda=frac.per.model)
# compute the index vector for the corresponding rows,
# weighted by the number of Poisson draws
indices <- rep((1:nrow(input))[draws > 0], draws[draws > 0])
# emit the rows; RHadoop takes care of replicating the key appropriately
# and rbinding the data frames from different mappers together for the
# reducer
keyval(rep(i, length(indices)), input[indices, ])
}

# here is where we generate the actual sampled data
raw.output <- lapply(1:num.models, generate.sample)

# and now we must reshape it into something RHadoop expects
output.keys <- do.call(c, lapply(raw.output, function(x) {x$key}))
output.vals <- do.call(rbind, lapply(raw.output, function(x) {x$val}))
keyval(output.keys, output.vals)
}

# REDUCE function
fit.trees <- function(k, v) {
# rmr rbinds the emited values, so v is a dataframe
# note that do.trace=T is used to produce output to stderr to keep
# the reduce task from timing out
rf <- randomForest(formula=model.formula, data=v, na.action=na.roughfix, ntree=10, do.trace=TRUE)
# rf is a list so wrap it in another list to ensure that only
# one object gets emitted. this is because keyval is vectorized
keyval(k, list(forest=rf))
}

mapreduce(input="/poisson/training.csv",
input.format="text",
map=poisson.subsample,
reduce=fit.trees,
output="/poisson/output")

forests <- from.dfs("/poisson/output")[["val"]]
68 changes: 68 additions & 0 deletions src/joindata.R
@@ -0,0 +1,68 @@
# Join transaction and machine tables
library(stringr)

na.values <- c("", "None or Unspecified", "Unspecified")

# set up some facilities for parsing different columns
parse.height <- function(from) {
data <- lapply(str_extract_all(from, "[0-9]+"), as.numeric)
sapply(data, function(x) {x[1] * 12 + x[2]})
}

setAs("character", "custom.date.1", function(from) {as.Date(from, format="%Y")})
setAs("character", "custom.date.2", function(from) {as.Date(from, format="%m/%d/%Y %H:%M")})
setAs("character", "tire.size", function(from) {as.numeric(gsub("[ \"]*$", "", from))})
setAs("character", "undercarriage", function(from) {as.numeric(gsub("[ a-zA-Z\"]*$", "", from))})
setAs("character", "stick.length", parse.height)

transactions <- read.table(file="~/Downloads/Train.csv",
header=TRUE,
sep=",",
quote="\"",
row.names=1,
fill=TRUE,
colClasses=c(MachineID="factor",
ModelID="factor",
datasource="factor",
YearMade="character",
# SalesID="character",
auctioneerID="factor",
UsageBand="factor",
saledate="custom.date.2",
Tire_Size="tire.size",
Undercarriage_Pad_Width="undercarriage",
Stick_Length="stick.length"),
na.strings=na.values)

machines <- read.table(file="~/Downloads/Machine_Appendix.csv",
header=TRUE,
sep=",",
quote="\"",
fill=TRUE,
colClasses=c(MachineID="character",
ModelID="factor",
fiManufacturerID="factor"),
na.strings=na.values)

# add a few features to the transaction data
transactions$saledatenumeric <- as.numeric(transactions$saledate)
transactions$ageAtSale <- as.numeric(transactions$saledate - as.Date(transactions$YearMade, format="%Y"))
transactions$saleYear <- as.numeric(format(transactions$saledate, "%Y"))
transactions$saleMonth <- as.factor(format(transactions$saledate, "%B"))
transactions$saleDay <- as.factor(format(transactions$saledate, "%d"))
transactions$saleWeekday <- as.factor(format(transactions$saledate, "%A"))
transactions$YearMade <- as.integer(transactions$YearMade)
transactions$MedianModelPrice <- unsplit(lapply(split(transactions$SalePrice, transactions$ModelID), median), transactions$ModelID)
transactions$ModelCount <- unsplit(lapply(split(transactions$SalePrice, transactions$ModelID), length), transactions$ModelID)

# join the transaction and machine data
training.data <- merge(x=transactions, y=machines, by="MachineID")

# write denormalized data out
write.table(x=training.data,
file="~/temp/training.csv",
sep=",",
quote=TRUE,
row.names=FALSE,
eol="\n",
col.names=FALSE)

0 comments on commit eb1a3cd

Please sign in to comment.