In [17]:
# Adapted from http://blog.learningtree.com/machine-learning-using-spark-r/
# Install these packages

library(readr)  
library(dplyr)

In [18]:
# Prepare data
url <- "https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv"  
df <-  
  read.csv(file=url, header=TRUE, sep=";") %>%
#  read_delim(url, delim = ";") %>%  
  dplyr::mutate(taste = as.factor(ifelse(quality < 6, "bad", ifelse(quality > 6, "good", "average")))) %>%  
  dplyr::select(-quality)  
df <- dplyr::mutate(df, id = as.integer(rownames(df)))

In [19]:
# This dataset contains the following features:
#
# fixed acidity
# volatile acidity
# citric acid
# residual sugar
# chlorides
# free sulfur dioxide
# total sulfur dioxide
# density
# pH
# sulphates
# alcohol
# quality (score between 0 and 10)

In [21]:
# Connect to Spark cluster
Sys.setenv(SPARK_HOME="/home/jovyan/spark")
library(SparkR, lib.loc=c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master="spark://172.30.63.205:7077",                     
               sparkConfig = list(spark.driver.memory="2g")
              )


Attaching package: ‘SparkR’

The following objects are masked from ‘package:dplyr’:

    arrange, between, collect, contains, count, cume_dist, dense_rank,
    desc, distinct, explain, filter, first, group_by, intersect, lag,
    last, lead, mutate, n, n_distinct, ntile, percent_rank, rename,
    row_number, sample_frac, select, sql, summarize, union

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var, window

The following objects are masked from ‘package:base’:

    as.data.frame, colnames, colnames<-, drop, endsWith, intersect,
    rank, rbind, sample, startsWith, subset, summary, transform, union

Spark package found in SPARK_HOME: /home/jovyan/spark


Launching java with spark-submit command /home/jovyan/spark/bin/spark-submit   --driver-memory "2g" sparkr-shell /tmp/Rtmpc8ddj8/backend_port194d05d637 


Java ref type org.apache.spark.sql.SparkSession id 1 

In [22]:
# Create Spark distributed dataframe
#
# SparkR doesn’t use normal data frames. 
# It makes use of a distributed data frame that can be spread across the nodes of the cluster. 
# This would normally be loaded from distributed storage such as HDFS or Amazon S3. 
# However, we’ll just convert our small wine data frame to a distributed data frame.

ddf <- createDataFrame(df)

“Use total_sulfur_dioxide instead of total.sulfur.dioxide  as column name”

In [23]:
head(ddf)

fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,pH,sulphates,alcohol,taste,id
7.0,0.27,0.36,20.7,0.045,45,170,1.001,3.0,0.45,8.8,average,1
6.3,0.3,0.34,1.6,0.049,14,132,0.994,3.3,0.49,9.5,average,2
8.1,0.28,0.4,6.9,0.05,30,97,0.9951,3.26,0.44,10.1,average,3
7.2,0.23,0.32,8.5,0.058,47,186,0.9956,3.19,0.4,9.9,average,4
7.2,0.23,0.32,8.5,0.058,47,186,0.9956,3.19,0.4,9.9,average,5
8.1,0.28,0.4,6.9,0.05,30,97,0.9951,3.26,0.44,10.1,average,6


In [24]:
# Split this into training (70%) and test (30%) datasets
#
# Note that Spark has not actually computed these datasets yet. 
# It’s lazily building up an execution plan that will be executed when the data is required.

seed <- 12345  
training_ddf <- sample(ddf, withReplacement=FALSE, fraction=0.7, seed=seed)  
test_ddf <- except(ddf, training_ddf)

In [25]:
# Train model to predict "taste"

# Random forests or random decision forests are an ensemble learning method for classification, 
# regression and other tasks, that operate by constructing a multitude of decision trees at 
# training time and outputting the class that is the mode of the classes (classification) or 
# mean prediction (regression) of the individual trees. Random decision forests correct for 
# decision trees' habit of overfitting to their training set.

model <- spark.randomForest(training_ddf, taste ~ ., type="classification", seed=seed)

In [26]:
# Examine model
summary(model)

Formula:  taste ~ .
Number of features:  12
Features:  fixed_acidity volatile_acidity citric_acid residual_sugar chlorides free_sulfur_dioxide total_sulfur_dioxide density pH sulphates alcohol id
Feature importances:  (12,[0,1,2,3,4,5,6,7,8,9,10,11],[0.026943430629333697,0.16408902163273212,0.03939055343761767,0.046731688718392846,0.13267612143586158,0.08488422363641222,0.05233570056053707,0.12694879829017308,0.034233266956816645,0.027467654882580052,0.23069808094155558,0.033601458877987454])
Number of trees:  20
Tree weights:  1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 RandomForestClassificationModel (uid=rfc_15b54a3f51f7) with 20 trees
  Tree 0 (weight 1.0):
    If (feature 6 <= 172.0)
     If (feature 5 <= 14.0)
      If (feature 4 <= 0.037)
       If (feature 3 <= 1.3)
        If (feature 1 <= 0.34)
         Predict: 1.0
        Else (feature 1 > 0.34)
         Predict: 1.0
       Else (feature 3 > 1.3)
        If (feature 8 <= 3.17)
         Predict: 0.0
        Else (feature 8 > 3.

In [27]:
# Use model to make predictions
# 
# We can use our newly trained model to make some predictions on the test dataset. 
# These can be retrieved as a normal (non-distributed) data frame.

predictions <- predict(model, test_ddf)  
prediction_df <- collect(select(predictions, "id", "prediction"))

In [28]:
# Now let’s join the actual “taste” scores to the predicted scores and see whether our model is accurate.

actual_vs_predicted <-  
 dplyr::inner_join(df, prediction_df, "id") %>%  
 dplyr::select(id, actual = taste, predicted = prediction)

In [29]:
mean(actual_vs_predicted$actual == actual_vs_predicted$predicted)
table(actual_vs_predicted$actual, actual_vs_predicted$predicted)

         
          average bad good
  average     461 163   57
  bad         166 340    6
  good        171   8  120

In [31]:
# If we want to use this model in future we can save it so we don’t have to retrain it every time.

# model_file_path <- "/home/jovyan/wine_random_forest_model"  
# write.ml(model, model_file_path)  
# saved_model <- read.ml(model_file_path)  
# summary(saved_model)

In [32]:
sparkR.session.stop()