In [None]:
// Add the "bootstrap" facility to the notebook
interp.load.ivy("com.videoamp" %% "ammonium-util" % "2.0.1")

In [None]:
// Import the ammonite.ops package members. See http://ammonite.io/#Ammonite-Ops for more info
import ammonite.ops._

In [None]:
// Set this to the IP address of your Spark master
val masterIP = ""

In [None]:
// Download the Spark libraries from the master and prepare Spark for configuration
// Execute this once the master node is available
vamp.ammonium.bootstrap(masterIP)

In [None]:
// The Scala kernel can run so-called Scala scripts. See http://ammonite.io/#ScalaScripts
// for more information. For example, to load the PostgreSQL JDBC driver and configure
// it for use with Spark, run
// interp.load.module(root/'home/'jupyter/'scripts/"postgresql.sc")

In [None]:
// This script defines an implicit class which adds the display() and display(count: Int) methods
// to the DataFrame type. Consider these methods to be more readable alternatives to show() and
// show(count: Int).
//
// interp.load.module(root/'home/'jupyter/'scripts/"displayable_data_frame.sc")

In [None]:
// Spark Configuration

// Set your Spark app's name
val appName = ""

// Set your app's core count. The default value configures Spark to allocate all available cores
// to your app
val appCores = Int.MaxValue

// Set the default level of parallelism for your Spark app. The recommended minimum is 512,
// and can be increased by factors of 2 to help address executor OOM errors in apps with
// large storage memory requirements
val parallelism = 512

// Set your desired executor heap size (in GB). The recommended range is 8 to 16. The JVM can
// struggle collecting garbage efficiently in Spark executors with large heap spaces
val executorHeapGB = 16

// The number of cluster cores to assign to each executor. To run one executor per worker, set
// this to the number of cores per worker node. To run two executors per worker, set it to half
// the number of cores per worker, etc. One executor per worker is recommended unless you need
// more than 16 GB of heap per worker. In this case, two executors with 16 GB of heap each is
// recommended rather than one executor with 32 GB
val coresPerExecutor = 32

// Set this to your worker's maximum allocated memory (in GB). 50 is recommended for c3.8xl
// workers and 220 is recommended for r3.8xl workers
val totalExecutorMemoryGB = 50

// This allocates the remainder of your worker's memory to off-heap memory. Do not change this
// unless you have good reason
val executorOffHeapBytes = (totalExecutorMemoryGB - executorHeapGB) * (1024 * 1024 * 1024).toLong

// These are recommended executor JVM flags
val executorFlags =
  "-XX:+UseParallelGC" ::
  "-XX:+HeapDumpOnOutOfMemoryError" ::
  "-XX:HeapDumpPath=/scratch1/heapdumps" ::
  "-XX:+PrintClassHistogram" ::
  Nil

// These tune some advanced settings to recommended values
sparkConf
  .set("spark.driver.maxResultSize", "2048")
  .set("spark.kryoserializer.buffer.max", "1g")
  .set("spark.rdd.compress", "true")

// You can set Hadoop configuration properties by prefixing a Hadoop configuration key with
// "spark.hadoop". For example, to set the default HDFS replication level to 2:
// sparkConf
//   .set("spark.hadoop.dfs.replication", "2")

// Uncomment to set additional configuration here
// sparkConf
//   .set("spark.foo", "bar")
//   .set("spark.biz", "baz")

// These settings simply reflect the values set above. Do not modify this
sparkConf
  .setAppName(appName)
  .set("spark.cores.max", appCores.toString)
  .set("spark.executor.cores", coresPerExecutor.toString)
  .set("spark.default.parallelism", parallelism.toString)
  .set("spark.sql.shuffle.partitions", parallelism.toString)
  .set("spark.executor.memory", executorHeapGB + "g")
  .set("spark.memory.offHeap.enabled", "true")
  .set("spark.memory.offHeap.size", executorOffHeapBytes.toString)
  .set("spark.executor.extraJavaOptions", executorFlags.mkString(" "))

In [None]:
// At this point you should have access to a `SparkSession` from the `spark` val, e.g.
// spark.table("mydumbdatabase.mydumbtable")

In [None]:
// Helpful imports

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel.OFF_HEAP

import org.apache.spark.sql.{ Column, DataFrame, Dataset }
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

import spark.implicits._
import spark.table