<img src="uva_seal.png">  

## Spark Getting Started

### University of Virginia
### DS 5110: Big Data Systems
### Last Updated: February 8, 2021

---  


### SOURCES
Learning Spark, First Edition

Chapter 1: Introduction to Data Analysis with Spark  
Chapter 2: Getting Started

### OBJECTIVES
-  Spark background
-  Setup and installation
-  Basic concepts
-  Minimal code examples
-  Running Spark: Interactive Session
-  Running Spark: Command Line

### CONCEPTS

- Cluster: a set of connected computers (nodes)

- Functional programming

- SparkSession - single point of entry to interact w Spark functionality

- Resilient Distributed Datasets (RDDs) - Spark’s fundamental abstraction for distributed data and computation

- Dataset

- Driver Program - contains application main function, defines RDDs on cluster, applies operations to them.

- Worker Node or Executor - the units that perform tasks

---

### Spark Background

- Designed to be fast  
no waiting around for hours, need to work interactively with data  


- Designed to handle big data


- General Purpose  
Unlike Hadoop, several modules in one place: machine learning, queries, streaming, graph analytics


- Caching is possible, so intermediate data can be stored in memory on workers


- Highly accessible: simple APIs to Python, Java, Scala, R, SQL  
Integrates w other Big Data tools such as Hadoop, Cassandra  
Can access HDFS data, Amazon S3, and others


**Documentation from README**  
You can find the latest Spark documentation, including a programming
guide, on the [project web page](http://spark.apache.org/documentation.html).

For general development tips, including info on developing Spark using an IDE, see   
[http://spark.apache.org/developer-tools.html](the Useful Developer Tools page).

Spark also comes with several sample programs in the `examples` directory.  
To run one of them in a shell, use `./bin/run-example <class> [params]`.  

For example:

`./bin/run-example SparkPi`

will run the Pi example locally.

**Install**  

Page 9 provides step-by-step download and install instructions

Depends:
1. Python needs to be installed
2. Java 6 or higher needs to be installed

Change logging level (Page 12), change from: 

log4j.rootCategory=INFO  

To  

log4j.rootCategory=WARN  

### Interactive Python shell

From installed location and using ($) to denote prompt:


`$ bin\pyspark`

Set up a minimal case Spark Session:
- using the local machine as master
- naming the app

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("pspark_test") \
        .getOrCreate()

In [2]:
# print info about the session
spark

In [5]:
sc = spark.sparkContext

### RDDs (Resilient Distributed Datasets)

RDDs are the most basic abstraction in Spark. They have these properties:

- **resilient**: list of dependencies instructs Spark how the RDD is constructed from inputs.  
  In the event the RDD is compromised, Spark can recreate it from dependencies.
  
- uses `partitions` for storing pieces of the data. Spark automatically partitions RDDs and distributes the partitions across nodes in the cluster.
  
- **distributed**: placing the partitions across nodes in the cluster allows for storing massive datasets that wouldn't fit on a single machine. 

**RDD History**

Before Spark 2.0, the main programming interface of Spark was the *Resilient Distributed Dataset (RDD)*.  

Starting with Spark 2.0, the *Dataset* and *DataFrame* objects were released. We discuss these later.

The RDD interface is still supported  

### Computing

**Example 1: Read lines from text file**

In [11]:
data_filename = 'README.txt'

In [12]:
lines = sc.textFile(data_filename)

In [13]:
lines.count()

41

In [14]:
lines.first()

'Apache Spark'

In [15]:
lines.collect()

['Apache Spark',
 'Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for stream processing.',
 '',
 'https://spark.apache.org/',
 '',
 'Jenkins Build AppVeyor Build PySpark Coverage',
 '',
 'Online Documentation',
 'You can find the latest Spark documentation, including a programming guide, on the project web page. This README file only contains basic setup instructions.',
 '',
 'Building Spark',
 'Spark is built using Apache Maven. To build Spark and its example programs, run:',
 '',
 './build/mvn -DskipTests clean package',
 '(You do not need to do this if you downloaded a pre-built package.)',
 '',
 'You can build Spark using more than one thread by u

In [16]:
type(lines.collect())

list

In [17]:
lines.collect()[1]

'Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for stream processing.'

In [18]:
type(lines.collect()[0])

str

**Example 2: Text Search  - print all lines containing “Spark”**

In [24]:
spark_lines = lines.filter(lambda x: "Spark" in x)

In [25]:
# return list of first 5 records
spark_lines.take(5)   

['Apache Spark',
 'Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for stream processing.',
 'Jenkins Build AppVeyor Build PySpark Coverage',
 'You can find the latest Spark documentation, including a programming guide, on the project web page. This README file only contains basic setup instructions.',
 'Building Spark']

In [27]:
type(spark_lines)

pyspark.rdd.PipelinedRDD

**Example 3: Word Count**

In [None]:
# Read the file into an RDD
lines = sc.textFile(data_filename)

In [None]:
type(lines)

In [None]:
words = lines.flatMap(lambda x: x.split())

In [None]:
words.take(5)

In [None]:
wordcounts = words.map(lambda x: (x, 1)) \
                  .reduceByKey(lambda x,y:x+y) \
                  .map(lambda x:(x[1],x[0])) \
                  .sortByKey(False)

In [None]:
wordcounts.take(10)

**TRY FOR YOURSELF (UNGRADED EXERCISES)**

1) Convert the Word Count example into a function called `word_count()`.

The function assumes you have already read in the text file into `lines`.  
It should take two inputs: 
- `lines`  this is the RDD containing text
- `num_records`  this is the number of wordcount pairs to return

It should output a list of the `num_records` most frequent word count pairs.  
Enter the code for `word_count()` in the cell below.

In [None]:
## definition of word_count()


Now test that `word_count()` returns the expected result.  
Also insure that the output type is a list.

In [None]:
## test function: word_count()
## calling type(output) should return a list


**SOLUTIONS**

In [None]:
def word_count(lines, num_records):
    
    wordcounts = lines.flatMap(lambda x: x.split()) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(lambda x,y:x+y) \
                  .map(lambda x:(x[1],x[0])) \
                  .sortByKey(False)
    
    return(wordcounts.take(num_records))

In [None]:
out = word_count(lines, 10)
out

In [None]:
type(out)