# Opaque Tutorial

Opaque is a package for Apache Spark SQL that enables analytics on sensitive data in an untrusted cloud. Opaque achieves this using Intel SGX trusted hardware enclaves, which make it possible to operate on encrypted data without revealing it to an attacker -- even one that controls the OS or hypervisor. Queries to Opaque are issued from a trusted client such as your laptop, which holds the encryption keys and verifies the trusted hardware.

Opaque queries are issued using Spark SQL's DataFrame API. Opaque supports a limited but growing subset of this API, allowing supported queries to be run securely with just a few code changes.

In this tutorial we will set up Opaque, learn to write supported queries over some sample data, and verify that Opaque is encrypting the data by inspecting the contents in memory.

For the purpose of the tutorial, we are running Spark in local mode, meaning the driver and the workers run in the same process and the SGX enclaves are only simulated. For a real deployment, the workers would run on a cluster with real SGX hardware.

This tutorial is written as a Jupyter notebook. You can execute a cell using `Shift+Enter`.

## Setting up Apache Spark and Opaque

This Jupyter notebook provides a Scala shell. To use Spark and Opaque, we need to import them. Ordinarily this would require specifying them as dependencies before launching the shell, but the notebook provides special syntax for loading the dependencies at runtime from the Ivy package manager:

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.0.2`
import $ivy.`edu.berkeley.cs.amplab::opaque:0.1`

Downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-shuffle/2.2.0/hadoop-mapreduce-client-shuffle-2.2.0-sources.jar
Downloading https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0-sources.jar.sha1
Downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-common/2.2.0/hadoop-mapreduce-client-common-2.2.0-sources.jar
Downloading https://repo1.maven.org/maven2/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1-sources.jar
Downloading https://repo1.maven.org/maven2/org/apache/parquet/parquet-jackson/1.7.0/parquet-jackson-1.7.0-sources.jar
Downloading https://repo1.maven.org/maven2/org/glassfish/hk2/hk2-locator/2.4.0-b34/hk2-locator-2.4.0-b34-sources.jar
Downloaded https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0-sources.jar.sha1
Downloaded https://repo1.maven.org/maven2/org/apache/parquet/parquet-jackson/1.7.0/parquet-jackson-1.7.0-sources.jar
Downlo

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                                   [39m

Next we need to start a [`SparkSession`](https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkSession.html), which is the entry point to Spark SQL. The call to `setJars()` passes in the Opaque dependency, which is already built for you, so that if we were running on a cluster Spark could launch Opaque on the workers as well. The call to `master()` specifies that we are running in the current process with 1 worker thread.

In [2]:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.log4j._

LogManager.getLogger("org").setLevel(Level.WARN)

val spark = SparkSession.builder()
  .config(new SparkConf().setJars(Seq("opaque/target/scala-2.11/opaque_2.11-0.1.jar")))
  .appName("notebook")
  .master("local[1]")
  .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/10/12 18:49:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/10/12 18:49:55 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.


[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.log4j._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@2c81421e

Next we import some useful types from Spark and Opaque, and inject Opaque's extensions into Spark SQL using `initSQLContext()`.

In [3]:
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl._
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

import spark.implicits._

import edu.berkeley.cs.rise.opaque.implicits._

edu.berkeley.cs.rise.opaque.Utils.initSQLContext(spark.sqlContext)

[32mimport [39m[36morg.apache.spark.sql.catalyst.analysis._
[39m
[32mimport [39m[36morg.apache.spark.sql.catalyst.dsl._
[39m
[32mimport [39m[36morg.apache.spark.sql.catalyst.errors._
[39m
[32mimport [39m[36morg.apache.spark.sql.catalyst.expressions._
[39m
[32mimport [39m[36morg.apache.spark.sql.catalyst.plans.logical._
[39m
[32mimport [39m[36morg.apache.spark.sql.catalyst.rules._
[39m
[32mimport [39m[36morg.apache.spark.sql.catalyst.util._
[39m
[32mimport [39m[36morg.apache.spark.sql.execution
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.types._

[39m
[32mimport [39m[36mspark.implicits._

[39m
[32mimport [39m[36medu.berkeley.cs.rise.opaque.implicits._

[39m

## Creating an encrypted DataFrame

Now we can create our first DataFrame. At first, we'll just specify its contents inline. First we create `data`, a list of three tuples. Then we load those into `df`, a Spark SQL DataFrame. Finally, we use Opaque's new `.encrypted()` method on DataFrames to encrypt it.

In [4]:
val data = Seq(("foo", 4), ("bar", 1), ("baz", 5))
val df = spark.createDataFrame(data).toDF("word", "count")
val dfEncrypted = df.encrypted

[36mdata[39m: [32mSeq[39m[([32mString[39m, [32mInt[39m)] = [33mList[39m(([32m"foo"[39m, [32m4[39m), ([32m"bar"[39m, [32m1[39m), ([32m"baz"[39m, [32m5[39m))
[36mdf[39m: [32mDataFrame[39m = [word: string, count: int]
[36mdfEncrypted[39m: [32mDataFrame[39m = [word: string, count: int]

In [7]:
dfEncrypted

[36mres6[39m: [32mDataFrame[39m = [word: string, count: int]

Now we can see how Opaque would execute a simple query such as a filter over this DataFrame to drop `("bar", 1)`. Spark SQL's `.explain()` method on DataFrames allows us to do this. The `true` argument gives extended output.

The output of `.explain(true)` has four parts:
1. The Parsed Logical Plan shows a representation of the query as we entered it. The `LocalRelation` operator represents `data`, the `Project` operator represents naming each column, the `Encrypt` operator is from our call to `df.encrypted`, and the `Filter` operator is from our call to `dfEncrypted.filter(...)`.
2. The Analyzed Logical Plan shows the query after analysis, which binds the `count` column to a unique identifier present in the input relation.
3. The Optimized Logical Plan shows the query after Opaque's rules have been applied. Now all the operator names start with `Encrypted`, showing that they are Opaque operators that will run inside SGX enclaves.
4. The Physical Plan shows the physical operators selected for each logical operator. For such a simple query, there is a one-to-one correspondence between the two.

In [8]:
dfEncrypted.filter($"count" > lit(3)).explain(true)

== Parsed Logical Plan ==
'Filter ('count > 3)
+- Encrypt
   +- Project [_1#0 AS word#5, _2#1 AS count#6]
      +- LocalRelation [_1#0, _2#1]

== Analyzed Logical Plan ==
word: string, count: int
Filter (count#6 > 3)
+- Encrypt
   +- Project [_1#0 AS word#5, _2#1 AS count#6]
      +- LocalRelation [_1#0, _2#1]

== Optimized Logical Plan ==
EncryptedFilter (count#6 > 3)
+- EncryptedLocalRelation [word#5, count#6]

== Physical Plan ==
EncryptedFilter (count#6 > 3)
+- EncryptedLocalTableScan [word#5, count#6], [[foo,4], [bar,1], [baz,5]]


Spark's DataFrame API is lazy, so the query hasn't actually run yet. Let's run it and see the results using `.show()`. You should see a pretty-printed table with only two out of the three tuples.

Somewhere above it, the message `Starting an enclave` should be printed. This is confirmation that Opaque is working!

In [9]:
dfEncrypted.filter($"count" > lit(3)).show

Starting an enclave
+----+-----+
|word|count|
+----+-----+
| foo|    4|
| baz|    5|
+----+-----+



Let's look at the raw contents of these DataFrames. Since `df` is unencrypted, we should be able to see its contents in plain text as Spark SQL `Row` objects. On the other hand, `dfEncrypted` should contain an encrypted binary representation of the rows.

Since `dfEncrypted` does not expose its contents as an unencrypted RDD, we have to use the developer API to access them. We have provided a function called `rawContents` to do this. This function in turn uses a helper function called `bytesToString`.

In [11]:
df.rdd.collect()

/** Convert `bytes` to a truncated hex string. */
def bytesToString(bytes: Array[Byte]): String = {
    val limit = 200
    val (truncBytes, ellipsis) = if (bytes.length < limit) {
        (bytes, "")
    } else {
        (bytes.take(limit), "...")
    }
    truncBytes.map(b => "%02X".format(b)).mkString(
        "%d bytes: [".format(bytes.length), " ", ellipsis + "]\n")
}

/**
 * Given `encDF`, an encrypted DataFrame, extract a string representation
 * of each of its encrypted blocks.
 */
def rawContents(encDF: DataFrame): Array[String] = {
  import edu.berkeley.cs.rise.opaque.execution.OpaqueOperatorExec
  val blocks = encDF.queryExecution.executedPlan.asInstanceOf[OpaqueOperatorExec].executeBlocked().collect()
  for (b <- blocks) yield bytesToString(b.bytes)
}

rawContents(dfEncrypted)

[36mres10_0[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m([foo,4], [bar,1], [baz,5])
defined [32mfunction[39m [36mbytesToString[39m
defined [32mfunction[39m [36mrawContents[39m
[36mres10_3[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"""380 bytes: [0C 00 00 00 00 00 06 00 08 00 04 00 06 00 00 00 04 00 00 00 01 00 00 00 0C 00 00 00 08 00 0C 00 04 00 08 00 08 00 00 00 03 00 00 00 04 00 00 00 48 01 00 00 8A 55 60 2E CE 98 AA 4A 82 C9 E2 95 01 DF 4A 3E 75 A1 AB 83 59 10 8B 86 6E A0 71 54 61 5A FE C9 15 C0 92 B8 8C 07 1C 82 35 DC 9E 73 A3 74 86 73 EB 32 A4 FE F1 C4 17 1A 33 AA BB 68 59 D2 3F AD 5F A9 20 A2 09 D3 1D DF 59 3A 97 2A 61 A9 9F 32 07 BC 0F 84 65 E9 A1 D8 0C 19 E9 3A 72 B3 49 DD 53 5A DC D1 90 98 82 75 69 6A 36 02 BE C9 29 88 D5 CD 61 34 29 81 00 3F 84 EC 21 C5 A8 E4 DF 3F 27 BC 4C 6C F4 3E 2C 79 87 9C 66 05 33 3A A4 39 B8 7E 32 FC...]
"""[39m
)

Great! Next we will write some Opaque queries on larger datasets.

## Writing Opaque queries

We'll be working with some synthetic medical datasets located in the `opaque/data/disease/` directory:

- `patient-125.csv` contains 125 patient records with patient ID, the ID of the patient's disease, and the patient's name. This is sensitive data and must always be encrypted when we are working with it.
- `disease.csv` contains over 70,000 known diseases. Each disease record contains a disease ID, the ID of the group of genes responsible for the disease, and the name of the disease.
- `treatment.csv` contains about 140,000 potential treatments. Each treatment record contains a treatment ID, the ID of the disease it treats, the name of the treatment, and how much it costs.

First, let's load these datasets and encrypt them.

In [12]:
val patientDF = spark.read.schema(
  StructType(Seq(
    StructField("p_id", IntegerType),
    StructField("p_disease_id", StringType),
    StructField("p_name", StringType))))
  .csv(s"opaque/data/disease/patient-125.csv")
  .encrypted

val diseaseDF = spark.read.schema(
  StructType(Seq(
    StructField("d_disease_id", StringType),
    StructField("d_gene_id", IntegerType),
    StructField("d_name", StringType))))
  .csv(s"opaque/data/disease/disease.csv")
  .encrypted

val treatmentDF = spark.read.schema(
  StructType(Seq(
    StructField("t_id", IntegerType),
    StructField("t_disease_id", StringType),
    StructField("t_name", StringType),
    StructField("t_cost", IntegerType))))
  .csv(s"opaque/data/disease/treatment.csv")
  .encrypted

[36mpatientDF[39m: [32mDataFrame[39m = [p_id: int, p_disease_id: string ... 1 more field]
[36mdiseaseDF[39m: [32mDataFrame[39m = [d_disease_id: string, d_gene_id: int ... 1 more field]
[36mtreatmentDF[39m: [32mDataFrame[39m = [t_id: int, t_disease_id: string ... 2 more fields]

**Exercise:** Now we can get a preview of their contents:

In [14]:
patientDF.show() 
diseaseDF.show()
treatmentDF.show()

+----+------------+--------------------+
|p_id|p_disease_id|              p_name|
+----+------------+--------------------+
|   0|        K006|       Daniel Taylor|
|   1|     W57XXXS|        Helen Garcia|
|   2|     T83411S|        Sharon Lewis|
|   3|        G113|         Mary Miller|
|   4|     W621XXD|         Mark Walker|
|   5|      M10231|Christopher Thompson|
|   6|      I63133|      David Gonzalez|
|   7|     T63444S|    Michael Gonzalez|
|   8|       Y9263|       Elizabeth Lee|
|   9|     S92109K|      James Thompson|
|  10|     T438X1S|       Dorothy Lopez|
|  11|     S52332K|       Maria Jackson|
|  12|       A5030|          Ruth Green|
|  13|     S071XXA|          Mary Scott|
|  14|     S42231K|        George Green|
|  15|      M60269|       George Walker|
|  16|       M6798|        Richard Hill|
|  17|     S6982XD|      Barbara Garcia|
|  18|     S99101A|       Jennifer Hill|
|  19|     V00382D|        Daniel Lewis|
+----+------------+--------------------+
only showing top

**Exercise:** Let's also verify that they contain the expected number of rows.

In [15]:
// TODO: Count the number of rows in each DataFrame (patientDF, diseaseDF, treatmentDF) using count().
patientDF.count() 
diseaseDF.count()
treatmentDF.count()

[36mres14_0[39m: [32mLong[39m = [32m125L[39m
[36mres14_1[39m: [32mLong[39m = [32m71486L[39m
[36mres14_2[39m: [32mLong[39m = [32m142972L[39m

**Exercise:** Now let's run some analytics. For each patient, we want to know the name of the disease they have, not just its ID.

<details>
<summary>Hint 1: Overview</summary>
    The patient names and the disease names are in different DataFrames: <code>patientDF</code> and <code>diseaseDF</code>. To bring them together, you'll need to <em>join</em> the two DataFrames together, then <em>select</em> out the columns we're interested in.
</details>
<details>
<summary>Hint 2: DataFrame syntax</summary>
The syntax to join two DataFrames is <code>dfA.join(dfB, $"column_in_dfA" === $"column_in_dfB")</code>.
    
The syntax to select columns from a DataFrame is <code>df.select($"column1", $"column2")</code>.
</details>
<details>
<summary>Hint 3: Column names</summary>
    You'll want to join <code>patientDF</code>'s <code>p_disease_id</code> column with <code>diseaseDF</code>'s <code>d_disease_id</code> column. Then you can select out the <code>p_name</code> and <code>d_name</code> columns.
</details>
<details>
<summary>Answer</summary>
    <code>patientDF.join(diseaseDF, $"p_disease_id" === $"d_disease_id").select($"p_name", $"d_name").show()</code>
</details>

In [12]:
// TODO: Join patientDF and diseaseDF to correlate patient names with disease names.


**Exercise:** How expensive is each disease to treat? For each disease, let's find the price of the lowest-cost treatment. (No need to find the treatment ID, just its cost.)

<details>
<summary>Hint 1: Overview</summary>
    Remember that each treatment in <code>treatmentDF</code> treats a particular disease, identified by disease id. We want to <em>group</em> treatments by the disease they treat, then perform an <em>aggregation</em> within each group to find the <em>minimum</em>-cost treatment.
</details>
<details>
<summary>Hint 2: DataFrame syntax</summary>
The syntax to group a DataFrame by a particular column is <code>df.groupBy($"column")</code>.
    
The syntax to aggregate a grouped DataFrame to find the minimum value for each group is <code>df.groupBy($"column1").agg(min($"column2"))</code>.
</details>
<details>
<summary>Answer</summary>
    <code>treatmentDF.groupBy($"t_disease_id").agg(min("t_cost").as("t_min_cost")).show()</code>
</details>

In [15]:
reatmentDF.groupBy($"t_disease_id").agg(min("t_cost").as("t_min_cost")).show()

cmd15.sc:1: not found: value reatmentDF
val res15 = reatmentDF.groupBy($"t_disease_id").agg(min("t_cost").as("t_min_cost")).show()
            ^Compilation Failed

: 

**Exercise:** Now let's put all three datasets together. For each patient, what disease do they have, and what is the lowest cost to treat it?

<details>
<summary>Hint 1: Overview</summary>
    Since this query involves correlating data from all three DataFrames, we'll need to <em>join</em> all of them. The common key that they all share is the <em>disease id</em>. Then we can <em>select</em> out the three columns we're interested in: patient name, disease name, and minimum treatment cost.
</details>
<details>
<summary>Hint 2: DataFrame syntax</summary>
To join three DataFrames together, we have to join them in pairs: <code>dfA.join(dfB.join(dfC, $"b_join_id" === $"c_join_id"), $"a_join_id" === $"b_join_id")</code>.
</details>
<details>
<summary>Answer</summary>
    <pre>val minCostTreatments = treatmentDF.groupBy($"t_disease_id").agg(min("t_cost").as("t_min_cost"))
minCostTreatments.join(
  diseaseDF.join(
    patientDF,
    $"d_disease_id" === $"p_disease_id"),
  $"d_disease_id" === $"t_disease_id")
  .select($"p_name", $"d_name", $"t_min_cost")
  .show()</pre>
</details>

In [None]:
// TODO: Join patientDF, diseaseDF, and the grouped version of treatmentDF from above to generate this report.