# Baseline Spark Model for AI on Accumulo

Before running this notebook, please
* create and activate a conda environment with Apache Toree installed
* build a shaded jar with Accumulo 2.0.0 and Spark 2.4.3 included
* clean yarn cache on the cluster due to previous runs (optoinal but recommended)
* run commands like the following to install Jupyter toree kernel
```
# Replace the jar file path based on your situation
JAR="file:///home/rba1/chenhui/build_jar/target/accumulo-spark-shaded.jar"
jupyter toree install \
    --replace \
    --user \
    --kernel_name=accumulo \
    --spark_home=${SPARK_HOME} \
    --spark_opts="--master yarn --jars $JAR \
        --packages com.microsoft.ml.spark:mmlspark_2.11:0.18.1 \
        --driver-memory 16g \
        --executor-memory 12g \
        --driver-cores 4 \
        --executor-cores 4 \
        --num-executors 128"
```

We train the model on duplicated twitter data with a target size and evaluate the inferencing speed on the same dataset. Besides, we evaluate the model accuracy by calculating AUC on a separate test set which is manually labeled. 

In [1]:
import java.util.Properties

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.accumulo.core.data.{Key, Value}

import org.apache.spark.storage.StorageLevel

// Stop existing spark context and create new one
sc.stop()

val conf = new SparkConf()
conf.setAppName("TwitterSentimentClassification")
// KryoSerializer is needed for serializing Accumulo Key when partitioning data for bulk import
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[Key], classOf[Value], classOf[Properties]))
conf.set("spark.driver.maxResultSize", "3g")

new SparkContext(conf)

println("Spark version %s".format(sc.version))
println("Scala %s".format(util.Properties.versionString))
println
sc.getConf.getAll.foreach(println)

Waiting for a Spark session to start...

Spark version 2.4.3
Scala version 2.11.12



Waiting for a Spark session to start...

(spark.kryo.classesToRegister,org.apache.accumulo.core.data.Key,org.apache.accumulo.core.data.Value,java.util.Properties)
(spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS,32nodecluster-0)
(spark.driver.port,38805)
(spark.driver.host,32nodecluster-0)
(spark.eventLog.enabled,true)
(spark.driver.appUIAddress,http://32nodecluster-0:4040)
(spark.repl.local.jars,file:///home/rba1/webscale-ai-test/target/accumulo-spark-shaded.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,file:///home/rba1/.ivy2/jars/org.scalactic_scalactic_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/org.scalatest_scalatest_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/io.spray_spray-json_2.11-1.3.2.jar,file:///home/rba1/.ivy2/jars/com.microsoft.cntk_cntk-2.4.jar,file:///home/rba1/.ivy2/jars/org.openpnp_opencv-3.2.0-1.jar,file:///home/rba1/.ivy2/jars/com.jcraft_jsch-0.1.54.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpclient-4.5.6.jar,file:///

conf = org.apache.spark.SparkConf@216c2dbb


org.apache.spark.SparkConf@216c2dbb

In [2]:
import java.nio.ByteBuffer
import java.util.{ArrayList, Collections, List}

import org.apache.accumulo.core.client.{Accumulo, AccumuloClient, BatchWriter, MutationsRejectedException}
import org.apache.accumulo.core.data.{Key, Mutation, Value}
import org.apache.accumulo.hadoop.mapreduce.{AccumuloFileOutputFormat, AccumuloInputFormat}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.Partitioner
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.{expr, first, max, min, rand, when, lit, col, explode, concat, array}
import org.apache.spark.sql.types.{LongType, DoubleType, StringType, StructField, StructType}

import scala.collection.mutable.ArrayBuffer

In [3]:
// client property file path
val PROPS_PATH = "./accumulo-client.properties"
// number of data copies based on 233Mb initial data
val data_copies = Map("1G" -> 5,  "10G" -> 44, "100G" -> 440, "1T" -> 4501)
val DATA_SIZE = "1G" //"1G"
val N_COPIES = data_copies(DATA_SIZE)
val TABLE_NAME = "twitter".concat(DATA_SIZE)
val N_SPLITS = 512
// test writing the duplicated data or not
val WRITE_DUP_DF = true
// number of partitions used in dfToTable()
val N_PARTS = conf.get("spark.executor.instances").toInt
// type of featurizer 
val FEATURIZER = "token_counts"

PROPS_PATH = ./accumulo-client.properties
data_copies = Map(1G -> 5, 10G -> 44, 100G -> 440, 1T -> 4501)
DATA_SIZE = 1G
N_COPIES = 5
TABLE_NAME = twitter1G
N_SPLITS = 512
WRITE_DUP_DF = false
N_PARTS = 128
FEATURIZER = token_counts


token_counts

In [4]:
def addSplits(spark: SparkSession, n_splits: Int, client: String, table: String): Unit = {
    import scala.collection.JavaConverters._
    import java.util.TreeSet
    import org.apache.hadoop.io.Text
    val splits_list = Range(0, n_splits).map(x => new Text("%04d".format(x)))
    val splits = new TreeSet(splits_list.asJava)
    val props = Accumulo.newClientProperties().from(client).build()
    val cl = Accumulo.newClient().from(props).build()
    cl.tableOperations().addSplits(table, splits)
    cl.close()
}

def getSplits(spark: SparkSession, client: String, table: String): Unit = {
    val props = Accumulo.newClientProperties().from(client).build()
    val cl = Accumulo.newClient().from(props).build()
    println(cl.tableOperations().listSplits(table))
    cl.close()
}

addSplits: (spark: org.apache.spark.sql.SparkSession, n_splits: Int, client: String, table: String)Unit
getSplits: (spark: org.apache.spark.sql.SparkSession, client: String, table: String)Unit


In [5]:
/** Load Spark DataFrame from Accumulo table
 *
 *  @param spark   Spark session
 *  @param client  Accumulo client properties filepath
 *  @param table   Accumulo table name
 *
 *  @return DataFrame
 */
def tableToDf(spark: SparkSession, client: String, table: String, colFs: String = "", 
              timer: Boolean = false): (DataFrame, Double, Double) = {
    var t0 = System.nanoTime()
    import spark.implicits._
    val props = Accumulo.newClientProperties().from(client).build()

    val cl = Accumulo.newClient().from(props).build()
    if (!cl.tableOperations().exists(table)) {
        throw new IllegalArgumentException("Table %s does not exist.".format(table))
    }
    cl.close()

    val sc = spark.sparkContext
    val job = Job.getInstance()

    AccumuloInputFormat.configure().clientProperties(props).table(table).store(job)
    
    val rawRDD = sc.newAPIHadoopRDD(
        job.getConfiguration,
        classOf[AccumuloInputFormat],
        classOf[Key],
        classOf[Value]
    )

    var rawRDD_time = 0.0
    if (timer) {
        rawRDD.cache().count()
        var t1 = System.nanoTime()
        rawRDD_time = (t1 - t0)*1e-9
        println("Time until rawRDD: " + rawRDD_time + "s")
    } 

    import spark.implicits._
    val rawDF = rawRDD.mapPartitions( partition => {
        partition.map({
            case (k, v) => (
                k.getRow().toString,
                k.getColumnFamily().toString,  // To get column qualifier, add k.getColumnQualifier().toString
                v.toString
            )
        })
    }).toDF("id", "colF", "value")
    
    if (timer) {
        rawDF.cache().count()
        var t2 = System.nanoTime()
        println("Time until rawDF: " + (t2 - t0)*1e-9 + "s")
    } 

    var pivotDF = rawDF
    if (colFs.length > 0) {
        pivotDF = rawDF.groupBy("id").pivot("colF", colFs.split("\\s*,\\s*").toSeq).agg(first($"value"))
    }
    else {
        pivotDF = rawDF.groupBy("id").pivot("colF").agg(first($"value"))
    }
    
    var trans_time = 0.0
    if (timer) {
        pivotDF.cache().count()
        var t3 = System.nanoTime() 
        var pivotDF_time = (t3 - t0)*1e-9
        trans_time = pivotDF_time - rawRDD_time
        println("Time until pivotDF: " + pivotDF_time + "s")
        println("Data transform time: " + trans_time + "s")
    } 
    rawRDD.unpersist()
    rawDF.unpersist()
    
    (pivotDF, rawRDD_time, trans_time)
}

tableToDf: (spark: org.apache.spark.sql.SparkSession, client: String, table: String, colFs: String, timer: Boolean)(org.apache.spark.sql.DataFrame, Double, Double)


In [6]:
/** Inject Spark DataFrame into Accumulo table.
 *  DataFrame should include a column named "id" (type Long) which will be used as the table's row id.
 *
 *  @param spark       Spark session
 *  @param df          Spark DataFrame
 *  @param numParts    Number of partitions to use. Should be NUM-EXECUTORS * n for the best performance
 *  @param client      Accumulo client properties filepath
 *  @param table       Accumulo table name
 *  @param defaultFS   Hadoop FileSystem url for "bulk" import 
 *  @param writeMode   Write mode - "batch" or "bulk"
 *  @param insertMode  Insert mode - "replace" (delete existing table if exist),
 *                     "overwrite" or "into" (ignore existing keys). 
 */
def dfToTable(spark: SparkSession, df: DataFrame, numParts: Int, client: String, table: String,
              defaultFS: String = "", writeMode: String = "batch", insertMode: String = "replace"): Unit = {
    // Check if df contains "id" column
    if (!(df.columns contains "id")) {
        throw new IllegalArgumentException("Input DataFrame should have 'id' column.")
    }

    import spark.implicits._
    val sc = spark.sparkContext

    // Create table (TODO other insert modes...)
    val props = Accumulo.newClientProperties().from(client).build()
    val cl = Accumulo.newClient().from(props).build()
    if (insertMode == "replace") {
        // Replace the existing table (if any)
        if (cl.tableOperations().exists(table)) cl.tableOperations().delete(table)
        cl.tableOperations().create(table)
    }
    
    // Pre-split the table 
    addSplits(spark, N_SPLITS, client, table)

    // Make new schema without "id" column
    val schema = df.schema.filter(_.name != "id")

    // Convert columns to String except "id". Double type "id" will be used by bulk import's rangePartition
    val dfStr = schema.filter(_.dataType != StringType).foldLeft(df) {
        case (df, col) => df.withColumn(col.name, df(col.name).cast(StringType))
    }

    if (writeMode == "batch") {
        val bcSchema = sc.broadcast(schema)
        val dfParted = dfStr.repartition(numParts, $"id") 
        dfParted.foreachPartition { partition =>
            // Intentionally created an Accumulo client for each partition to avoid attempting to
            // serialize it and send it to each remote process.
            var cl = None: Option[AccumuloClient]
            var bw = None: Option[BatchWriter]
            try {
                cl = Some(Accumulo.newClient().from(props).build())
                bw = Some(cl.get.createBatchWriter(table))
                // TODO Maybe use partition.grouped(1000).foreach
                partition.foreach { record =>
                    val m = new Mutation(record.getAs[Long]("id").toString)
                    bcSchema.value.foreach { s =>
                        // at() has been introduced since 2.0.0.
                        // Add .visibility(v).timestamp(t) if needed
                        m.at().family(s.name).qualifier("").put(record.getAs[String](s.name))
                    }
                    try {
                        bw.get.addMutation(m)
                    }
                    catch {
                        case e: MutationsRejectedException => e.printStackTrace()
                    }
                }
            }
            finally {
                if (bw.isDefined) bw.get.close()
                if (cl.isDefined) cl.get.close()
            }
        }
    }
    
    else {
        throw new IllegalArgumentException("Write mode should be either 'batch' or 'bulk'")
    }

    cl.close()
}

dfToTable: (spark: org.apache.spark.sql.SparkSession, df: org.apache.spark.sql.DataFrame, numParts: Int, client: String, table: String, defaultFS: String, writeMode: String, insertMode: String)Unit


In [7]:
def tables(spark: SparkSession, client: String): Unit = {
    val props = Accumulo.newClientProperties().from(client).build()
    val cl = Accumulo.newClient().from(props).build()
    println(cl.tableOperations().list())
    cl.close()
}

tables: (spark: org.apache.spark.sql.SparkSession, client: String)Unit


## Load Twitter Data

In [8]:
def duplicateDf(inDf: DataFrame, nDup: Int, onlyID: Boolean = false) : DataFrame = {    
    val result = inDf
        .withColumn("dup_idx", explode(array((1 until (nDup+1)).map(lit): _*)))
        .withColumn("id", concat(col("id"), lit("_"), col("dup_idx")))
    if (onlyID) {
        return result.select("id")
    } else {
        return result.select("id", "sentiment", "date", "query_string", "user", "text")
    }
}

duplicateDf: (inDf: org.apache.spark.sql.DataFrame, nDup: Int, onlyID: Boolean)org.apache.spark.sql.DataFrame


In [9]:
// Write duplicated twitter data into Accumulo
var df_dup: DataFrame = _
if (WRITE_DUP_DF) {
    // define data schema
    val schema = StructType(Array(
        StructField("sentiment", DoubleType),
        StructField("id", StringType),
        StructField("date", StringType),
        StructField("query_string", StringType),
        StructField("user", StringType),
        StructField("text", StringType)
    ))
    // need to upload data to hdfs first via 
    // hdfs dfs -put /home/rba1/chenhui/accumulo_ml_baseline/data/sentiment140.csv sentiment140.csv
    val file_path = "sentiment140_prefix.csv"
    val df = spark.read.format("csv").schema(schema).load(file_path)
    
    df_dup = duplicateDf(df, N_COPIES)
    df_dup.cache().count()
    
    var t0 = System.nanoTime()
    dfToTable(
        spark,
        df=df_dup,
        numParts=N_PARTS,
        client=PROPS_PATH,
        table=TABLE_NAME
    )
    var t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0)*1e-9 + "s")
    
    df_dup.unpersist()
} else {
    println("Skip writing the duplicated dataframe into Accumulo!")
}

Skip writing the duplicated dataframe into Accumulo!


df_dup: org.apache.spark.sql.DataFrame = null


()

In [10]:
// Read training data from Accumulo
println("Reading data from Accumulo...")
val (df_all, 
     read_time, 
     trans_time) = 
     tableToDf(spark, 
               client=PROPS_PATH, 
               table=TABLE_NAME,
               timer=true
     )

Reading data from Accumulo...
Time until rawRDD: 8.922607352s
Time until rawDF: 14.490108339s
Time until pivotDF: 23.639355363s
Data transform time: 14.716748011s


df_all = [id: string, date: string ... 4 more fields]
read_time = 8.922607352
trans_time = 14.716748011


14.716748011

In [11]:
df_all.count()

8000000

In [12]:
df_all.show(10)

+-----------------+--------------------+------------+---------+--------------------+--------------+
|               id|                date|query_string|sentiment|                text|          user|
+-----------------+--------------------+------------+---------+--------------------+--------------+
|0000_1467833736_2|Mon Apr 06 22:25:...|    NO_QUERY|      0.0|No new csi tonigh...|  MagicalMason|
|0000_1467856044_3|Mon Apr 06 22:31:...|    NO_QUERY|      0.0|Crazy wind today ...|   EcoTravelTV|
|0000_1467901839_5|Mon Apr 06 22:43:...|    NO_QUERY|      0.0|@CarVin1 lol they...|SummerJSanders|
|0000_1467917499_1|Mon Apr 06 22:48:...|    NO_QUERY|      0.0|@RandomlyNat Jeez...| VictoriaBahar|
|0000_1467931839_2|Mon Apr 06 22:52:...|    NO_QUERY|      0.0|@margaretcho what...|       dmurda6|
|0000_1467945704_2|Mon Apr 06 22:56:...|    NO_QUERY|      0.0|anyone else havin...|      amfairie|
|0000_1467987736_2|Mon Apr 06 23:08:...|    NO_QUERY|      0.0|   @polhillian YUP. |       roboppy|


## Prepare Training Data

In [13]:
// df_all only contains positive or negative tweets
// df_all.select("sentiment").distinct().collect() returns Array([0.0], [4.0])
var train_df = df_all.orderBy(rand()) // Randomly permute the data for online training
                     .withColumn("sentiment2", 'sentiment.cast("Int"))
                     .select('sentiment2 as 'label, 'text as 'text)
                     .withColumn("label", when('label > 0, 1.0D).otherwise(0.0D))

if ((N_PARTS < 128) && (N_COPIES >= data_copies("100G"))) {
    train_df = train_df.repartition(8192)
}

println("Num train samples = %s".format(train_df.cache().count()))

Num train samples = 8000000


train_df = [label: double, text: string]


[label: double, text: string]

## Logistic Regression

In [14]:
import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.Pipeline

In [15]:
val tokenizer = new RegexTokenizer()
  .setGaps(false)
  .setPattern("\\p{L}+")
  .setInputCol("text")
  .setOutputCol("words")

val countVectorizer = new CountVectorizer()
  .setInputCol("words")
  .setOutputCol("features")

val lr = new LogisticRegression()
  .setMaxIter(1)
  .setRegParam(0.2)
  .setElasticNetParam(0.0)

tokenizer = regexTok_95065a4cc274
countVectorizer = cntVec_2ec09e7f3163
lr = logreg_5f28d787394f


logreg_5f28d787394f

In [16]:
// Evaluate model training speed - feature engineering
var t0 = System.nanoTime()

val feat_eng_pipeline = new Pipeline().setStages(Array(tokenizer, countVectorizer))
val featEng = feat_eng_pipeline.fit(train_df)

var t1 = System.nanoTime()

var train_fea = featEng.transform(train_df)
train_fea.cache().count()

var t2 = System.nanoTime()
val feat_eng_time = (t2 - t1)*1e-9
val feat_eng_time_with_fit = (t2 - t0)*1e-9
println("Time to fit FE pipeline: " + (t1 - t0)*1e-9 + "s")
println("Time to get train_fea: " + feat_eng_time + "s")
println("Total time: " + feat_eng_time_with_fit + "s")
train_df.unpersist()

Time to fit FE pipeline: 5.442636145000001s
Time to get train_fea: 8.601079914000001s
Total time: 14.043716059000001s


t0 = 341791514517341
feat_eng_pipeline = pipeline_1b109d72b2f1
featEng = pipeline_1b109d72b2f1
t1 = 341796957153486
train_fea = [label: double, text: string ... 2 more fields]
t2 = 341805558233400
feat_eng_time = 8.601079914000001
feat_eng_time_with_fit = 14.043716059000001


[label: double, text: string]

In [17]:
// Evaluate model training speed - model training
if ((N_PARTS < 128) && (N_COPIES >= data_copies("100G"))) {
    train_fea = train_fea.repartition(8192)
    train_fea.cache().count()
}

var t0 = System.nanoTime()

val lrModel = lr.fit(train_fea)

var t1 = System.nanoTime()
val train_time = (t1 - t0)*1e-9
println("Time to train lr model: " + train_time + "s")
train_fea.unpersist()

Time to train lr model: 7.546550775s


t0 = 341807773979052
lrModel = LogisticRegressionModel: uid = logreg_5f28d787394f, numClasses = 2, numFeatures = 262144
t1 = 341815320529827
train_time = 7.546550775


[label: double, text: string ... 2 more fields]

In [18]:
// Save feature engineering pipeline and model to hdfs
featEng.write.overwrite().save("./model/lrFeatEng_".concat(DATA_SIZE))
lrModel.write.overwrite().save("./model/lrModel_".concat(DATA_SIZE))

## Save Results

In [19]:
import java.time.LocalDateTime
val columns = Seq("id", "read_time", "trans_time", "feat_eng_time", "feat_eng_time_with_fit", "train_time")
val trial_id = sc.applicationId + "_" + LocalDateTime.now()
val results = Seq((trial_id, 
                   read_time, 
                   trans_time,
                   feat_eng_time, 
                   feat_eng_time_with_fit,
                   train_time))

columns = List(id, read_time, trans_time, feat_eng_time, feat_eng_time_with_fit, train_time)
trial_id = application_1568922621130_0019_2019-09-23T18:39:05.992
results = List((application_1568922621130_0019_2019-09-23T18:39:05.992,8.922607352,14.716748011,8.601079914000001,14.043716059000001,7.546550775))


List((application_1568922621130_0019_2019-09-23T18:39:05.992,8.922607352,14.716748011,8.601079914000001,14.043716059000001,7.546550775))

In [20]:
val df_results = results.toDF(columns: _*)
df_results.show()

+--------------------+-----------+------------+-----------------+----------------------+-----------+
|                  id|  read_time|  trans_time|    feat_eng_time|feat_eng_time_with_fit| train_time|
+--------------------+-----------+------------+-----------------+----------------------+-----------+
|application_15689...|8.922607352|14.716748011|8.601079914000001|    14.043716059000001|7.546550775|
+--------------------+-----------+------------+-----------------+----------------------+-----------+



df_results = [id: string, read_time: double ... 4 more fields]


[id: string, read_time: double ... 4 more fields]

In [21]:
val results_tablename = "lr_train_results_%s_executors_%s_%s" format(N_PARTS, FEATURIZER, DATA_SIZE)
val results_filename = results_tablename + ".csv"

// Write results to csv file
df_results.write.format("com.databricks.spark.csv").mode("overwrite").save(".results/" + results_filename)

// Write results to Accumulo
dfToTable(
    spark,
    df=df_results,
    numParts=1,
    client=PROPS_PATH,
    table=results_tablename
)

results_tablename = lr_train_results_128_executors_token_counts_1G
results_filename = lr_train_results_128_executors_token_counts_1G.csv


lr_train_results_128_executors_token_counts_1G.csv