## Dependencies

This Notebook assumes you have an Accumulo cluster running and have a shaded jar file with accumulo dependencies.
- To setup a simple Accumulo cluster: https://github.com/apache/fluo-uno
- To build a shaded jar with accumulo dependencies: https://github.com/apache/accumulo-examples/tree/master/spark

Adding the shaded jar after the notebook is running does not work, it needs to be added prior to starting jupyter

```
SPARK_OPTS="--jars ~/repos/accumulo-examples/spark/target/accumulo-spark-shaded.jar" jupyter notebook
```

In [1]:
// Adding the MMLSpark JAR dynamically does work
%AddJar file:///home/scgraham/repos/mmlspark/target/scala-2.11/mmlspark_2.11-0.17-159-6b4d8bfb-20190912-0312-SNAPSHOT.jar

Starting download from file:///home/scgraham/repos/mmlspark/target/scala-2.11/mmlspark_2.11-0.17-159-6b4d8bfb-20190912-0312-SNAPSHOT.jar


Waiting for a Spark session to start...

Finished download of mmlspark_2.11-0.17-159-6b4d8bfb-20190912-0312-SNAPSHOT.jar


Waiting for a Spark session to start...

In [2]:
%AddDeps org.apache.spark spark-avro_2.11 2.4.0

Marking org.apache.spark:spark-avro_2.11:2.4.0 for download
Obtained 2 files


In [3]:
import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._

import org.apache.accumulo.core.client.{Accumulo, AccumuloClient, BatchWriter}
import org.apache.accumulo.core.data.{Key, Mutation, Value}
import org.apache.accumulo.core.security.Authorizations

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructType, StructField}

## Setup Accumulo Tables

In [4]:
val inputTable = "spark_example_input"
val outputTable = "spark_example_output"
val rootPath = new Path("/spark_example/")

inputTable = spark_example_input
outputTable = spark_example_output
rootPath = /spark_example


/spark_example

In [5]:
def accumuloClientProperties = "/home/scgraham/repos/fluo-uno/install/accumulo-2.0.0/conf/accumulo-client.properties"

accumuloClientProperties: String


In [6]:
// create accumulo client
val props = Accumulo.newClientProperties().from(accumuloClientProperties).build();
val client = Accumulo.newClient().from(props).build()

val hdfs = FileSystem.get(new Configuration());

props = {auth.type=password, auth.principal=root, instance.zookeepers=localhost:2181, instance.name=uno, auth.token=secret}
client = org.apache.accumulo.core.clientImpl.ClientContext@2d439cab
hdfs = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_104873185_46, ugi=scgraham (auth:SIMPLE)]]


DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_104873185_46, ugi=scgraham (auth:SIMPLE)]]

In [7]:
// cleanup hdfs root path
if (hdfs.exists(rootPath)) {
  hdfs.delete(rootPath, true);
}

// reset accumulo tables
Seq(inputTable, outputTable).foreach(table => {
    if (client.tableOperations().exists(table)) {
      client.tableOperations().delete(table)
    }
    client.tableOperations().create(table)
})

In [8]:
import java.util.TreeSet
import org.apache.hadoop.io.Text
val splits = new TreeSet[Text]()
splits.add(new Text("0025"))
splits.add(new Text("0050"))
splits.add(new Text("0075"))

client.tableOperations().addSplits(inputTable, splits)

splits = [0025, 0050, 0075]


[0025, 0050, 0075]

In [9]:
// write data to input table
val batchWriter = client.createBatchWriter(inputTable)
for (i <- 0 until 100) {
  val m = new Mutation(f"$i%03d")
  m.at().family("cf1").qualifier("cq1").put("row_" + i);
  batchWriter.addMutation(m);
}
batchWriter.close()

batchWriter = org.apache.accumulo.core.clientImpl.BatchWriterImpl@12198766


org.apache.accumulo.core.clientImpl.BatchWriterImpl@12198766

In [10]:
val scanner = client.createScanner(inputTable, Authorizations.EMPTY)
scanner.fetchColumnFamily("cf1")

scanner = org.apache.accumulo.core.clientImpl.ScannerImpl@6657b0e6


org.apache.accumulo.core.clientImpl.ScannerImpl@6657b0e6

In [11]:
scanner.zipWithIndex.foreach { case(e, i) => if (i < 10) println(e)}

000 cf1:cq1 [] 1568258494560 false=row_0
001 cf1:cq1 [] 1568258494560 false=row_1
002 cf1:cq1 [] 1568258494560 false=row_2
003 cf1:cq1 [] 1568258494560 false=row_3
004 cf1:cq1 [] 1568258494560 false=row_4
005 cf1:cq1 [] 1568258494560 false=row_5
006 cf1:cq1 [] 1568258494560 false=row_6
007 cf1:cq1 [] 1568258494560 false=row_7
008 cf1:cq1 [] 1568258494560 false=row_8
009 cf1:cq1 [] 1568258494560 false=row_9


## Spark - Accumulo Reader

In [12]:
// reload Spark session with config

// KryoSerializer is needed for serializing Accumulo Key when partitioning data for bulk import
val conf = new SparkConf()
    .setAppName("AccumuloDataSourceExample")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .registerKryoClasses(Array(classOf[Key], classOf[Value], classOf[Properties]))

val spark = SparkSession
    .builder()
    .config(conf)
    .getOrCreate()

conf = org.apache.spark.SparkConf@31faa778
spark = org.apache.spark.sql.SparkSession@155fb1e7


org.apache.spark.sql.SparkSession@155fb1e7

In [13]:
props.put("table", inputTable)
props.put("maxPartitions", "200")
val properties = props.asScala

properties = Map(auth.type -> password, maxPartitions -> 200, auth.principal -> root, table -> spark_example_input, instance.zookeepers -> localhost:2181, instance.name -> uno, auth.token -> secret)


Map(auth.type -> password, maxPartitions -> 200, auth.principal -> root, table -> spark_example_input, instance.zookeepers -> localhost:2181, instance.name -> uno, auth.token -> secret)

In [14]:
val schema = new StructType().add("cf1", new StructType().add("cq1", StringType, true), false)

schema = StructType(StructField(cf1,StructType(StructField(cq1,StringType,true)),false))


StructType(StructField(cf1,StructType(StructField(cq1,StringType,true)),false))

In [15]:
val df = spark
    .read
    .format("com.microsoft.ml.spark.accumulo")
    .options(properties)
    .schema(schema)
    .load()

df.show(10)

+-------+
|    cf1|
+-------+
|[row_0]|
|[row_1]|
|[row_2]|
|[row_3]|
|[row_4]|
|[row_5]|
|[row_6]|
|[row_7]|
|[row_8]|
|[row_9]|
+-------+
only showing top 10 rows



df = [cf1: struct<cq1: string>]


[cf1: struct<cq1: string>]

In [16]:
df.count

100

In [17]:
props.put("table", outputTable)

spark_example_input

In [18]:
df.write
  .format("com.microsoft.ml.spark.accumulo")
  .options(properties)
  .save

Name: org.apache.spark.SparkException
Message: Writing job aborted.
StackTrace:   at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.

In [33]:
val row = df.take(1)(0)
row.getStruct(0).getString(0)

row = [[row_0]]


row_0

In [34]:
row.fieldIndex

Name: Unknown Error
Message: <console>:69: error: missing argument list for method fieldIndex in trait Row
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `fieldIndex _` or `fieldIndex(_)` instead of `fieldIndex`.
       row.fieldIndex
           ^

StackTrace: 

In [29]:
// spark.stop