##Initialize the Spark and SparkSQL contexts

In [None]:
library(SparkR)
sc <- sparkR.init("local[*]")
sqlCtx<- sparkRSQL.init(sc)

##Load the 3 datasets from Parquet files into SparkR as DataFrames
- **txnsRaw**: Transaction data organized by line-item
- **demo**: Demographic data, organized by customer ID
- **sample**: A subset of customers who received a DM offer. This is our target variable.

In [None]:
txnsRaw<- loadDF(sqlCtx, paste(getwd(), "/Customer_Transactions.parquet", sep = ""), "parquet")
demo <- withColumnRenamed(loadDF(sqlCtx, paste(getwd(), "/Customer_Demographics.parquet", sep = ""), "parquet"),
                          "cust_id", "ID")
sample <- loadDF(sqlCtx, paste(getwd(), "/DM_Sample.parquet", sep = ""), "parquet")

##We can view the schema of any DataFrame with `printSchema()`:

In [None]:
printSchema(txnsRaw)

##And take a peak at it with `head()`:

In [None]:
head(txnsRaw)

##Now we need to generate a few measures for customer behavior to use in our model:

- **txns**: The number of transactions per customer
- **spend**: Total expenditure per customer


In [None]:
perCustomer <- agg(groupBy(txnsRaw,"cust_id"),
                   txns = countDistinct(txnsRaw$day_num),
                   spend = sum(txnsRaw$extended_price))

head(perCustomer)

##Next, we'll need to grab the demographic data for all our customers:

In [None]:
joinToDemo <- select(join(perCustomer, demo),
                     demo$"*",
                     perCustomer$txns, 
                     perCustomer$spend)

##So, what does this join actually look like?

In [None]:
explain(joinToDemo)

##What's wrong with this?

In [None]:
joinToDemo <- select(join(perCustomer, demo, perCustomer$cust_id == demo$ID),
                     demo$"*",
                     perCustomer$txns, 
                     perCustomer$spend)

explain(joinToDemo)

##Now that we've got all our variables prepared, we need to create separate training and estimation sets.

In [None]:
trainDF <- select(join(joinToDemo, sample, joinToDemo$ID == sample$cust_id),
                joinToDemo$"*",
                sample$respondYes)

estDF <- select(
           filter(
             join(joinToDemo, sample, joinToDemo$ID == sample$cust_id, "left_outer"),
           "cust_id IS NULL"),
         joinToDemo$"*")

printSchema(trainDF)

persist(estDF, "MEMORY_ONLY")
count(estDF)

##Now that we've got our data prepped and pared down, we can turn each SparkSQL DataFrame into an R `data.frame`:

In [None]:
train <- collect(trainDF) ; train$ID <- NULL

est <- collect(estDF)

##How do we go from Spark to R?

In [None]:
class(est)

names(est)

summary(est)

##Now that we've transitioned to pure R, we can use one of its strongest features: the modeling functions.

In [None]:
theModel <- glm(respondYes ~ ., "binomial", train)

summary(theModel)

##Finally, let's create a custom scoring function that will use R's `predict` method and also output the scores by customer ID.

In [None]:
predictWithID <- function(modObj, data, idField) {
  scoringData <- data[, !names(data) %in% as.character(idField)]
  scores <- predict(modObj, scoringData, type = "response", se.fit = TRUE)
  idScores <- data.frame(ID = data[as.character(idField)], Score = scores$fit)
  idScores[order( -idScores$Score), ]
}

testScores <- predictWithID(theModel, est, "ID")

In [None]:
head(testScores, 25)