### Initialize Spark session

In [28]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

println("Initializing Spark context...")
val conf = new SparkConf().setAppName("Example App")
val spark: SparkSession = SparkSession.builder.config(conf).getOrCreate()
//if you look in Spark Master UI, a application will be running after this

Initializing Spark context...


conf = org.apache.spark.SparkConf@2bbffd76
spark = org.apache.spark.sql.SparkSession@1dcacea6


lastException: Throwable = null


org.apache.spark.sql.SparkSession@1dcacea6

### Custom Jar import

In [29]:
println("Import custom jar in the notebook with a special Toree directive")
%AddJar file:///app/app.jar

Import custom jar in the notebook with a special Toree directive
Starting download from file:///app/app.jar
Finished download of app.jar


In [30]:
println("Importing custom class")
import app.Point
val p = new Point(1,2)
p.move(3,5)

Importing custom class
Point x location : 4
Point y location : 7


p = app.Point@6db4b5e6


app.Point@6db4b5e6

### Hello World using remote Spark master

In [32]:
println("************")
println("Hello, world!")
val rdd = spark.sparkContext.parallelize(Array(1 to 10))
rdd.count()
println("************")

************
Hello, world!
************


rdd = ParallelCollectionRDD[22] at parallelize at <console>:136


ParallelCollectionRDD[22] at parallelize at <console>:136

### HDFS tests

In [33]:
val hdfsPrefix = sys.env("HDFS_URL")
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", sys.env("HDFS_URL"))
val hdfs = FileSystem.get(hadoopConf)

hdfsPrefix = hdfs://namenode1:8020
hadoopConf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml
hdfs = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-915978460_230, ugi=root (auth:SIMPLE)]]


DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-915978460_230, ugi=root (auth:SIMPLE)]]

In [34]:
println("Copying sample file to HDFS...")
val srcPath = new Path("/notebooks/people.csv")
val destPath = new Path("hdfs:///test/people.csv")
hdfs.copyFromLocalFile(srcPath, destPath)

Copying sample file to HDFS...


srcPath = /notebooks/people.csv
destPath = hdfs:/test/people.csv


hdfs:/test/people.csv

In [35]:
println("Load CSV from HDFS to Dataframe")
val df = spark.read
          .format("com.databricks.spark.csv")
          .option("inferSchema", "true")
          .option("header", "true")
          .load(hdfsPrefix + "/test/people.csv")
df.describe().show()

Load CSV from HDFS to Dataframe
+-------+-----------------+------------------+------------------+
|summary|            Index|            Height|            Weight|
+-------+-----------------+------------------+------------------+
|  count|            25000|             25000|             25000|
|   mean|          12500.5| 67.99311359679979|127.07942116079916|
| stddev|7217.022700994273|1.9016787712056056|11.660897563604271|
|    min|                1|          60.27836|          78.01476|
|    max|            25000|           75.1528|           170.924|
+-------+-----------------+------------------+------------------+



df = [Index: int, Height: double ... 1 more field]


[Index: int, Height: double ... 1 more field]

In [36]:
println("Perform some SQL over CSV contents")
df.createOrReplaceTempView("people")
val df2 = spark.sql("SELECT * FROM people WHERE Height BETWEEN 68 AND 71")
df2.describe().show()

Perform some SQL over CSV contents
+-------+------------------+------------------+------------------+
|summary|             Index|            Height|            Weight|
+-------+------------------+------------------+------------------+
|  count|             11050|             11050|             11050|
|   mean|12598.144977375565| 69.22068299909508| 130.8976952787331|
| stddev| 7230.438697100867|0.8064637962888647|10.368627688958973|
|    min|                 3|          68.00003|          90.53995|
|    max|             25000|          70.99707|           170.924|
+-------+------------------+------------------+------------------+



df2 = [Index: int, Height: double ... 1 more field]


[Index: int, Height: double ... 1 more field]

In [37]:
println("Save CSV using Dataframe")
df2.repartition(5).write
   .format("com.databricks.spark.csv")
   .option("header", "true")
   .mode("overwrite")
   .save(hdfsPrefix + "/test/people-result.csv")

Save CSV using Dataframe


### Custom Maven dependency

In [26]:
print("Add Vegas lib")
%AddDeps org.vegas-viz vegas_2.11 0.3.11 --transitive
%AddDeps org.vegas-viz vegas-spark_2.11 0.3.11

Add Vegas libMarking org.vegas-viz:vegas_2.11:0.3.11 for download
Obtained 42 files
Marking org.vegas-viz:vegas-spark_2.11:0.3.11 for download
Obtained 2 files


### Show some graphs (finally!)

In [7]:
import vegas._
import vegas.sparkExt._

Vegas("Person Heights", width=400, height=100)
  .withDataFrame(df)
  .encodeX("Index", Quantitative)
  .encodeY("Height", Quantitative)
  .mark(Bar)
  .show

Vegas("Person Weights", width=400, height=100)
  .withDataFrame(df)
  .encodeX("Index", Quantitative)
  .encodeY("Weight", Quantitative)
  .mark(Bar)
  .show

### Stop application

In [19]:
println("Stop Spark session")
spark.stop()
//if you look in Spark Master UI, no application will be running after stop

Stop Spark session
