Skip to content

Spark Intro

Hoa Nguyen edited this page Jan 8, 2018 · 16 revisions

Introduction

Spark is a general computation engine that uses distributed memory to perform fault-tolerant computations with a cluster. Even though Spark is relatively new, it’s one of the hottest open-source technologies at the moment and has begun to surpass Hadoop’s MapReduce model. This is partly because Spark’s Resilient Distributed Dataset (RDD) model can do everything the MapReduce paradigm can, and more. In addition, Spark can perform iterative computations at scale, which opens up the possibility of performing machine learning algorithms (with the Spark MLlib tool) much, much quicker than with Hadoop alone.

Dependencies

AWS Instances - minimum t2.medium due to memory usage Hadoop 2.4 or higher

Setup Spark standalone

Spark will be installed on the master and all the workers. The workers will be then configured by modifying the spark-env.sh on the master node.

Run the following on the master and all workers by SSH-ing into each node:

Install java-development-kit and scala
master-slave-node$ sudo apt-get update
master-slave-node$ sudo apt-get install openjdk-7-jdk scala

Install sbt
master-slave-node$ wget https://dl.bintray.com/sbt/debian/sbt-0.13.7.deb -P ~/Downloads
master-slave-node$ sudo dpkg -i ~/Downloads/sbt-0.13.7.deb
master-slave-node$ sudo apt-get install sbt

Install Spark
master-slave-node$ wget http://apache.mirrors.tds.net/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz -P ~/Downloads
master-slave-node$ sudo tar zxvf ~/Downloads/spark-1.6.1-bin-hadoop2.6.tgz -C /usr/local
master-slave-node$ sudo mv /usr/local/spark-1.6.1-bin-hadoop2.6 /usr/local/spark
master-slave-node$ sudo chown -R ubuntu /usr/local/spark

Set the SPARK_HOME environment variable and add to PATH in .profile
master-slave-node$ sudo nano ~/.profile
Add the following to ~/.profile and source it

export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin

master-slave-node$ . ~/.profile

Set the Java path for spark-env
master-slave-node$ cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh

master-slave-node$ sudo nano $SPARK_HOME/conf/spark-env.sh
Add the following to the spark-env.sh

export JAVA_HOME=/usr
export SPARK_PUBLIC_DNS="<public-dns>"
export SPARK_WORKER_CORES=$(echo $(nproc)*3 | bc)

## Configure worker nodes on master
SSH into the master node and run the following:

SSH into master node
localhost$ ssh -i ~/.ssh/personal_aws.pem ubuntu@master-public-dns

Create a slaves file under $SPARK_HOME/conf
master-node$ touch $SPARK_HOME/conf/slaves

Add each worker’s public dns to the slaves configuration file
master-node$ echo <slave-public-dns> | cat >> $SPARK_HOME/conf/slaves

Start Spark on master
master-node$ $SPARK_HOME/sbin/start-all.sh

You can check if your standalone cluster is up and running by checking the WebUI at master-public-dns:8080. The webpage should look like the following. Be sure the number of workers available matches what you expect. The example below is a cluster of 4 nodes with 3 acting as workers.

Word Count problem with the spark-shell REPL on the master node

Make example text file and load it onto HDFS

master-node$ mkdir ~/spark-examples
master-node$ nano ~/spark-examples/test.txt

and insert the following text:

Alice sits drowsily by a riverbank, bored by the book her older sister reads to her.
Out of nowhere, a White Rabbit runs past her, fretting that he will be late.
The Rabbit pulls a watch out of his waistcoat pocket
and runs across the field and down a hole.

We can now copy this onto HDFS so all the worker nodes can access the file. If the user folder has not been created yet, please make it on HDFS

master-node$ hdfs dfs -mkdir /user
master-node$ hdfs dfs -copyFromLocal ~/spark-examples/test.txt /user/test.txt

Start the Spark shell

Run the following to start the REPL. master-hostname refers to the hostname of your master node. You can find the hostname of your master node with the following command.

master-node$ hostname
ip-172-31-8-228
master-node$ $SPARK_HOME/bin/spark-shell --master spark://master-hostname:7077

or if you want to use PySpark

master-node$ $SPARK_HOME/bin/pyspark --master spark://master-hostname:7077

You should see an output with the Spark ASCII art and ending with the scala> or >>> prompt.
Note: The Spark REPL interprets file paths from the directory that you launched from, which is why we changed to the HOME(a.k.a. ~) directory before launching. If you’re in a different directory, modify the file path accordingly or use CTRL + d to exit, change directories and launch the REPL again. However, keep in mind that all values and variables are lost when you restart the REPL.

You can also go to master-public-dns:4040 and check to see that you have the expected number of executors while running the Spark shell

We can now write WordCount in Scala or Python by executing three concise commands:

scala> val file = sc.textFile("hdfs://<public_dns>:9000/user/test.txt")
scala> val counts = file.flatMap(line => line.split(" "))
   .map(word => (word, 1))
   .reduceByKey(_ + _)
scala> counts.collect().mkString("\n")

or if you want to use PySpark

>>> file = sc.textFile("hdfs://<public_dns>:9000/user/test.txt")
>>> counts = file.flatMap(lambda line: line.split(" "))\
           .map(lambda word: (word, 1))\
           .reduceByKey(lambda a, b: a + b)
>>> res = counts.collect()
>>> for val in res:
>>>     print val

The first line simply loads the text file into a value named file using the special sc object, which stands for SparkContext and is used for interacting with the file system, similar to Hadoop’s Context class.

The second command creates a value named counts that creates the key-value pairs for the word count; this is where all the logic is. For each line, it makes a map and flattens it (i.e. turns a list of lists into one “flat list” of all the words) with words being split up by a space. Each word in the list is then mapped to the corresponding key-value pair [i.e. (Rabbit, 1)] and then summed up by keys in an anonymous function (a.k.a. lambdas). Part of the reason that this code is so concise is the functional programming style, but don’t worry about that now if you’re unfamiliar with it.

You may have noticed that reading the file in and performing the flatmap on the RDD was fairly fast to execute. Spark actually perform lazy evaluations which means it will not actually compute the output until a specific action is performed. Most of the functions in Spark can be classified as either a transformation or an action. When you pipe transformations together, such as flatmap and reduce, you are in fact creating a Directed Acyclic Graph (DAG) for the tasks to be performed. It is only when you call a function such as saveAsTextFile or collect that Spark will perform the full operation and send the data through the DAG. The purpose for this is that Spark will do its best to optimize this DAG for performance.

Your output should look like the following:

Scala ************* Python *************

Note that you can track this job using the WebUI at master-public-dns:4040

You can enter CTRL + C to exit the REPL.

Writing a Spark standalone application

Playing with the spark shell is great for prototyping, but normally an application is preferred such that it can periodically run and update values in databases. We will now run through an example which will involve performing some cleaning of the data. We will be writing the application in Scala, compile the code into a jar and then submit it to Spark. This current example also assumes that you have Hadoop installed on the cluster.

First download the dataset from the Google Drive link to your local machine: price data

The data is a couple days of tick level Gold Futures data from the CME Group. Each row contains a date, the last price at that time, and the number of contracts (or volume) traded at that price. The date is in the format of:

Schema:
<year><month><day> <hour><minute><seconds>; <price>; <volume>

Example:
20140602 22:34:12; 1250.5; 12  

Let’s now transfer these files over to your Hadoop cluster and load it onto HDFS into the folder ‘user’

local_machine:~$ scp -r -i   ubuntu@:~/price_data

master-node:~$ hdfs dfs -mkdir -p /user/price_data
master-node:~$ hdfs dfs -copyFromLocal ~/price_data/*.txt /user/price_data

We would like to compute the average price and total volume traded at each 30 minute interval. To do this we will need to map each timestamp to a 30 minute time slot. Next we will perform an average on the price and a sum of the contracts traded in each 30 minute time slot.

Let’s setup the project directory as well from the home directory on the Spark master node

master-node:~$ mkdir -p my-project/src/main/scala

Next we will create an sbt file named exercise.sbt in the project folder

master-node:~$ nano my-project/exercise.sbt

Place the following into exercise.sbt. This will be used later when we compile our scala application.

name := "price_data"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.2.1" % "provided"
)

Now create a file under my-project/src/main/scala named price_data.scala. This will be our main scala application which will compute the average price and total volume for each 30 minute period.

master-node:~$ nano my-project/src/main/scala/price_data.scala

You can paste the following into price_data.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object price_data {
 def main(args: Array[String]) {

   // setup the Spark Context named sc
   val conf = new SparkConf().setAppName("PriceDataExercise")
   val sc = new SparkContext(conf)

   // folder on HDFS to pull the data from
   val folder_name = "hdfs://<public_dns>:9000/user/price_data"

   // function to convert a timestamp to a 30 minute time slot
   def convert_to_30min(timestamp: String): String = {
       val min30 = timestamp.slice(11,13).toInt/30*30
       timestamp.take(11) + f"${min30}%02d" + "00"
   }

   // read in the data from HDFS
   val file = sc.textFile(folder_name)

   // map each record into a tuple consisting of (time, price, volume)
   val ticks = file.map(line => {
                        val record = line.split(";")
                       (record(0), record(1).toDouble, record(2).toInt)
                                })

   // apply the time conversion to the time portion of each tuple and persist it memory for later use
   val ticks_min30 = ticks.map(record => (convert_to_30min(record._1),
                                          record._2,
                                          record._3)).persist

   // compute the average price for each 30 minute period
   val price_min30 = ticks_min30.map(record => (record._1, (record._2, 1)))
                                .reduceByKey( (x, y) => (x._1 + y._1,
                                                         x._2 + y._2) )
                                .map(record => (record._1,
                                                record._2._1/record._2._2) )

   // compute the total volume for each 30 minute period
   val vol_min30 = ticks_min30.map(record => (record._1, record._3))
                              .reduceByKey(_+_)

   // join the two RDDs into a new RDD containing tuples of (30 minute time periods, average price, total volume)
   val price_vol_min30 = price_min30.join(vol_min30)
                                    .sortByKey()
                                    .map(record => (record._1,
                                                    record._2._1,
                                                    record._2._2))

   // save the data back into HDFS
   price_vol_min30.saveAsTextFile("hdfs://<public_dns>:9000/user/price_data_output_scala")
 }
}

The scala application can be built by moving into the projects folder and compiling the application using sbt. The jar file will be created under the target folder. We can submit this to the Spark driver by calling spark-submit. The master_hostname should be only the hostname of your master node

master-node:~$ cd my-project
master-node:~/my-project$ sbt package
master-node:~/my-project$ spark-submit --class price_data --master spark://master_hostname:7077 target/scala-2.11/price_data_2.11-1.0.jar

You will notice that the output file is actually split into two parts for this example. This is because Spark read the file in as two partitions by default. If you wanted a single file out in the end, you could call the .collect() on the last RDD price_vol_min30 before saving it to HDFS.

master-node:~$ hdfs dfs -ls /user/price_data_output_scala

-rw-r--r--   3 ubuntu supergroup          0 2015-05-21 21:50 /user/price_data_spark_output/_SUCCESS
-rw-r--r--   3 ubuntu supergroup       1074 2015-05-21 21:50 /user/price_data_spark_output/part-00000
-rw-r--r--   3 ubuntu supergroup       1037 2015-05-21 21:50 /user/price_data_spark_output/part-00001

The output for this dataset should look somewhat similar to the following by executing the following command:

master-node:~$ hdfs dfs -cat /user/price_data_output_scala/part-00000

master-node:~$ hdfs dfs -cat /user/price_data_output_scala/part-00001

We can see here that the date times have been changed to 30 minute windows and the price is now a floating point number. Volume is larger resulting in the sum of all traded volume in these time periods.