# 1. Introduction to Apache Spark
This Lab will show you how to work with Apache Spark using Python

### SparkContext is the entry point for a Spark application. Note. It sets up internal services and establishes a connection to the underlying Spark execution environment 

#### Step 1 - Working with Spark Context
- Invoke the spark context and extract what version of the spark driver application.

Type<br>
sc.version

In [None]:
#Step 1.1 - Check spark version


### 2. Resilient Distributed Datasets (RDD) is the fundamental data structure in Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions that may be computed on different nodes in the cluster.

#### Step 2 - Working with Resilient Distributed Datasets

2.1 Create RDD with numbers 1 to 10,<br>
2.2 Extract first line,<br>
2.3 Extract first 5 lines,<br>
2.4 Add 1 to each element in the numbers RDD, <br>
2.5 Inspect the new RDD (with 1 added to each element in the original RDD), <br>
2.6 Create RDD with string "Hello Spark", Extract first line, <br>
2.7 Create RDD with multiple string values, <br>
2.8 Count the number of entries in the RDD, <br>
2.9 Show all entries in the RDD, <br>
2.10 Split individual entries in the RDD based on comma (,) delimiter, <br>
2.11 Use flatMap RDD function to split rows with multiple entries into individual elements, <br>
2.12 Display contents of flatMap RDD, <br>
2.13 Perform key-based aggregation using the reduceByKey RDD function, <br>

Type: <br>
x = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]<br>
x_nbr_rdd = sc.parallelize(x)<br>

In [None]:
#2.1 - Create RDD of Numbers 1-10


Type: <br>
x_nbr_rdd.first()

In [None]:
#2.2 - Extract first line


Type:<br>
x_nbr_rdd.take(5)

In [None]:
#2.3 - Extract first 5 lines


Perform a first map transformation and rpelace each element X in the RDD with X+1.<br>
Type:<br>
x_nbr_rdd_2 = x_nbr_rdd.map(lambda x: x+1)

In [None]:
#2.4 - Perform your first map transformation. Replace each element X in the RDD with X+1.
#Remember that RDDs are IMMUTABLE, so it is not possible to UPDATE an RDD. You need to create
#a NEW RDD


Take a look at the elements of the new RDD.<br>
Type:<br>
x_nbr_rdd_2.collect()   

In [None]:
#2.5 - Check out the elements of the new RDD. Warning: Be careful with this in real life !! As you
#will be bringing all elements of the RDD (from all partitions) to the driver...


Let's now create a new RDD with one string "Hello Spark" and take a look at it.<br>
Type:<br>
y = ["Hello Spark!"]<br>
y_str_rdd = sc.parallelize(y)<br>
y_str_rdd.first()<br>

In [None]:
#2.6 - Create String RDD, Extract first line


Let's now create a third RDD with several strings.<br>
Type:<br>
z = ["First,Line", "Second,Line", "and,Third,Line"]<br>
z_str_rdd = sc.parallelize(z)<br>
z_str_rdd.first()

In [None]:
#2.7 - Create String RDD with many lines / entries, Extract first line


Count the number of entries in this RDD.<br>
Type:<br>
z_str_rdd.count()

In [None]:
#2.8 - Count the number of entries in the RDD


Take a look at the elements of this RDD.<br>
Type:<br>
z_str_rdd.collect()

In [None]:
#2.9 - Show all the entries in the RDD. Warning: Be careful with this in real life !! 
#As you will be bringing all elements of the RDD (from all partitions) to the driver...


In the next step, we will split all the entries in the RDD on the commas "," <br>
Type: <br>
z_str_rdd_split = z_str_rdd.map(lambda line: line.split(","))<br>
z_str_rdd_split.collect()

In [None]:
#2.10 - Perform a map transformation to split all entries in the RDD on the commas ",".

#Check out the entries in the new RDD

#Notice how the entries in the new RDD are now ARRAYs with elements, where the original
#strings have been split using the comma delimiter.

In this step, we will learn a new transformation besides map: flatMap <br>
flatMap will "flatten" all the elements of an RDD entry into its subcomponents<br>
This is better explained with an example<br>
Type:<br>
z_str_rdd_split_flatmap = z_str_rdd.flatMap(lambda line: line.split(","))<br>
z_str_rdd_split_flatmap.collect()

In [None]:
#2.11 - Learn the difference between two transformations: map and flatMap.
#Go back to the RDD z_str_rdd_split defined above using a map transformation from z_str_rdd
#and use this time a flatmap.


#What do you notice ? How is z_str_rdd_split_flatmap different from z_str_rdd_split ?

In this step, we will augment each entry in the previous RDD with the number "1" to create pairs (or tuples). The first element of the tuple will be the keyword and the second elements of the tuple will be the digit "1".<br>
This is a common technic used to count elements using Spark.<br>
Type:<br>
countWords = z_str_rdd_split_flatmap.map(lambda word:(word,1))<br>
countWords.collect()

In [None]:
#2.12 - Learn the difference between two transformations: map and flatMap.
#Go back to the RDD z_str_rdd_split defined above using a map transformation from z_str_rdd
#and use this time a flatmap.


Now we have above what is known as a PAIR RDD. Each entry in the RDD has a KEY and a VALUE.<br>
The KEY is the word (First, Line, etc...) and the value is the number "1"<br>
We can now AGGREGATE this RDD by summing up all the values BY KEY<br>
Type:<br>
from operator import add<br>
countWords2 = countWords.reduceByKey(add)<br>
countWords2.collect()<br>

In [None]:
#2.13 - Check out the results of the aggregation

#You just created an RDD countWords2 which contains the counts for each token...

### 3. I/O with Apache Spark
Spark supports a wide range of input and output sources, partly because it builds on the ecosystem available for Hadoop. In particular, Spark can access data through the InputFormat and OutputFormat interfaces used by Hadoop MapReduce, which are available for many common file formats and storage systems (e.g., S3, HDFS, Cassan‐ dra, HBase, etc.).

#### Step 3 - Count number of lines with Spark in it
3.1 Pull in a spark README.md file from a remote site (GitHub in this case) , <br>
3.2 Convert the file to an RDD,<br>
3.3 Create a new RDD by filtering out lines that dont contain the work "Spark" in it, <br>
3.4 Count the total number of lines in the file and the number of lines with the word "Spark" in it, <br>
3.5 Count the number of lines starting with the word "Spark", <br>
3.6 Display the tokens which contain the substring "Spark" in them, <br>

Type:<br>
!rm README.md* -f<br>
!wget https://github.com/carloapp2/SparkPOT/blob/master/README.md<br>


In [None]:
#3.1 - Pull data file into workbench


Now we will point Spark to the text file stored in the local filesystem and use the "textFile" method to create an RDD named "textfile_rdd" which will contain one entry for each line in the original text file.<br>
We will also count the number of lines in the RDD (which would be as well the number of lines in the text file. <br>
Type:<br>
textfile_rdd = sc.textFile("README.md")<br>
textfile_rdd.count()<br>

In [None]:
#3.2 - Create RDD from data file


Let us now filter out the RDD and only keep the entries that contain the token "Spark". This will be achieved using the "filter" transformation, combined with the Python syntax for figuring out whether a particular substring is present within a larger string: substring in string.<br>
We will also take a look at the first line in the newly filtered RDD. <br>
Type:<br>
Spark_lines = textfile_rdd.filter(lambda line: "Spark" in line)<br>
Spark_lines.first()<br>

In [None]:
#3.3 - Filter for only lines with word Spark


Perform additional transformations on the text file we read
-> Splitting the text file into words
-> Removing empty lines
-> Extract the first word for each line and do a count

tokenized = textfile_rdd.filter(lambda line: line.size > 0).map(lambda line: line.split(" "))
counts = tokenized.map(lambda words:(words(0),1)).reduceByKey(lambda (a,b):a+b)

We will now count the number of entries in this filtered RDD and present the result as a concatenated string.<br>
Type:<br>
print "The file README.md has " + str(Spark_lines.count()) + \<br>
" of " + str(textfile_rdd.count()) + \<br>
" Lines with the word Spark in it."<br>

In [None]:
#3.4 - count the number of lines


Using your knowledge from the previous exercises, you will now count the number of times the substring "Spark" appears in the original text.<br>
Instructions:<br>
Looking back at previous exercises, you will need to: <br>
1- Execute a flatMap transformation on the original RDD Spark_lines and split on white space.<br>
2- Augment each token with the digit "1", so as to obtain a PAIR RDD where the first element of the tuple is the token and the second element is the digit "1".<br>
3- Execute a reduceByKey with the addition to count the number of instances of each token.<br>
4- Filter the resulting RDD from Step 3- above to only keep entries which start with "Spark".<br> In Python, the syntax to decide whether a string starts with a token is string.startswith("token"). <br>
5- Display the resulting list of tokens which start with "Spark".

Type:<br>
spark_lines_flatmap=spark_lines.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1)).reduceByKey(add)
spark_lines_flatmap.filter(lambda (k,v): k.startswith("Spark")).collect()

In [None]:
#3.5 - count the number of instances of tokens starting with "Spark"


As a slight modification of the cell above, let us now filter out and display the tokens which contain the substring "Spark". (Instead of those which only START with it). Your result should be a superset of the previous result. <br>
The Python syntax to determine whether a string contains a particular "token" is: "token" in string<br>

Type:<br>
spark_lines_flatmap.filter(lambda (k,v): "Spark" in k).collect()

In [1]:
#3.6 - Display the tokens which contain the substring "Spark" in them.


### 4. RDD Persistence
One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). 



Let us inspect the execution plan (lineage graph) for reading the README.md file. The access plan should look something like this
<br>
(2) MapPartitionsRDD[159] at textFile at NativeMethodAccessorImpl.java:-2 []<br>
|  README.md HadoopRDD[158] at textFile at NativeMethodAccessorImpl.java:-2 []<br>
<br>
Type:<br>
textfile_rdd.toDebugString()

Let us inspect the execution plan (lineage graph) for reading lines that have the word "Spark" in it. The access plan should look something like this
<br>
(2) PythonRDD[170] at RDD at PythonRDD.scala:43 []<br>
|  MapPartitionsRDD[159] at textFile at NativeMethodAccessorImpl.java:-2 []<br>
|  README.md HadoopRDD[158] at textFile at NativeMethodAccessorImpl.java:-2 []<br>
<br>
Type:<br>
spark_lines.toDebugString()

Let us now persist/cache the intermediate results (RDD with lines that have the word "Spark" in it). Any subsequent access will not require the entire lineage of reading the file.

Type:<br>
spark_lines.cache()

Let us inspect the execution plan after caching the data. It should look something like this.<br>
(2) PythonRDD[170] at RDD at PythonRDD.scala:43 [Memory Serialized 1x Replicated]<br>
|  MapPartitionsRDD[159] at textFile at NativeMethodAccessorImpl.java:-2 [Memory Serialized 1x Replicated]<br>
|  README.md HadoopRDD[158] at textFile at NativeMethodAccessorImpl.java:-2 [Memory Serialized 1x Replicated]<br>

Type:<br>
spark_lines.toDebugString()

Let us now replace the data in the textfile_rdd RDD with some dummy data. Since we have cached the spark_lines RDD, we will still be able to read the filtered data with lines containing "Spark".

<br>
Type:<br>
dummy_data=[1,2,3,4,5,6,7,8,9,10]<br>

textfile_rdd=sc.parallelize(dummy_data)<br>
textfile_rdd.collect()<br>
spark_lines.collect()<br>

### 5. Spark SQL
Spark SQL is Spark’s interface for working with structured and semistructured data. Structured data is any data that has a schema—that is, a known set of fields for each record. When you have this type of data, Spark SQL makes it both easier and more efficient to load and query. In particular, Spark SQL provides three main capabilities:
a. It provides a DataFrame abstraction that simplifies working with structured datasets. DataFrames are similar to tables in a relational database.
b. It can read and write data in a variety of structured formats (e.g., JSON, Hive Tables, and Parquet).
c. It lets you query the data using SQL, both inside a Spark program and from external tools that connect to Spark SQL through standard database connectors (JDBC/ODBC), such as business intelligence tools like Tableau.

5.1 Create SQL context, <br>
5.2 Download sample JSON data, <br>
5.3 Create data frame, <br>
5.4 Inspect data frame schema, <br>
5.5 Inspect contents of data frame, <br>
5.6 Use SQL to select contents of the data frame, <br>
5.7 Convert to Pandas data frame, <br>
5.8 Perform aggregation (group by) using SQL Select, <br>
5.9 Read nested data using SQL, <br>
5.10 Create simple plots, <br>
5.11 Dynamic schema assignment <br>


#### 5.1 Getting started: Create a SQL Context
Type:<br>
from pyspark.sql import SQLContext<br>
sqlContext = SQLContext(sc)

In [2]:
#Create the SQLContext


#### 5.2 Download sample JSON data
Let's download the data, we can run commands on the console of the server (or docker image) that the notebook enviroment is using. To do so we simply put a "!" in front of the command that we want to run. For example:
!pwd
To get the data we will download a file to the enviroment. Simple run these two commands, the first just ensures that the file is removed if it exists:
<br>
Type:<br>
!rm world_bank.json.gz -f 
!wget https://raw.githubusercontent.com/bradenrc/sparksql_pot/master/world_bank.json.gz

In [3]:
#enter the commands to remove and download file here


#### 5.3 Create a Dataframe
Now you can create the Dataframe, note that if you wanted to see where you downloaded the file you can run !pwd or !ls
To create the Dataframe <br>
Type:<br>
example1_df = sqlContext.read.json("world_bank.json.gz")

In [None]:
#create the Dataframe here:


#### 5.4 Inspect data frame schema
<br>
Type:<br>
example1_df.printSchema()

In [4]:
#print out the schema


#### 5.5 Inspect contents of data frame
<br>
Type:<br>
example1_df.take(2)

In [None]:
#Use take on the dataframe to pull out 2 rows


#### 5.6 Use SQL to select contents of the data frame

Using
DataframeObject.registerTempTable("name_of_table")

Create a table named "world_bank"

Type:<br>
example1_df.registerTempTable("world_bank")<br>
temp_df=sqlContext.sql("select * from world_bank")


In [None]:
#Create the table to be referenced via SQL

#Use SQL to select from table limit 2 and print the output


#### 5.7 Convert to Pandas data frame

Using
DataframeObject.toPandas()
<br>
Type:<br>
print type(temp_df)
temp_df.toPandas()

In [None]:
#Extra credit, take the Dataframe you created with the two records and convert it into Pandas


#### 5.8 Perform aggregation (group by) using SQL Select

Using
DataframeObject=sqlContext.sql("SELECT STATEMENT")
DataframeObject.collect();

<br>
Type:<br>
new_df=sqlContext.sql("select regionname,count(*) from world_bank group by regionname")
new_df.collect()

In [None]:
#Now Calculate a Simple count based on a group, for example "regionname"


#### 5.9 Read nested JSON data using SQL Select

Using
sqlContext.sql("SELECT NESTED.COLUMN_NAME FROM SQL TABLE STATEMENT").collect()
<br>
Type:<br>
sqlContext.sql("select sector.Name from world_bank limit 2").collect()

In [None]:
# With JSON data you can reference the nested data
# If you look at Schema above you can see that Sector.Name is a nested column
# Select that column and limit to reasonable output (like 2)


#### 5.10 Create simple graphs/plots

<br>
Type:<br>
%matplotlib inline 
import matplotlib.pyplot as plt, numpy as np

In [None]:
# we need to tell the charting library (matplotlib) to display charts inline
# just run this paragraph


Type:<br>
query = "select count(*) as Count, countryname from world_bank group by countryname" <br>
chart1_df = sqlContext.sql(query).toPandas()<br>
print chart1_df<br>

In [None]:
# first write the sql statment and look at the data, remember to add .toPandas() to have it look nice
# an even easier option is to create a variable and set it to the SQL statement
# for example: 


Type:<br>
chart1_df.plot(kind='bar', x='countryname', y='Count', figsize=(12, 5))

In [None]:
# now take the variable (or same sql statement) and use the method:


#### 5.11 Dynamic schema assignment

First, we need to create an RDD of pairs or triplets. This can be done using code (for loop) as
seen in the instructor's example, or more simply by assigning values to an array.

Type:<br>
array=[[1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5]] <br>
my_rdd = sc.parallelize(array)<br>
my_rdd.collect()<br>

In [None]:
# Default array defined below. Feel free to change as desired.


Use first the StructField method, following these steps:<bR>
1- Define your schema columns as a string<br>
2- Build the schema object using StructField<br>
3- Apply the schema object to the RDD<br>
Note: The cell below is missing some code and will not run properly until the missing code has been completed.<br>

Type:<br>
from pyspark.sql.types import *<br>
schemaString = "col1 col2 col3"<br>
fields = [StructField(field_name, IntegerType(), True) for field_name in schemaString.split()]<br>
schema = StructType(fields)<br>

schemaExample = sqlContext.createDataFrame(my_rdd, schema)<br>

schemaExample.registerTempTable("my_rdd_table")<br>



In [None]:

# The schema is encoded in a string. Complete the string below

# MissingType() should be either StringType() or IntegerType(). Please replace as required.

# Apply the schema to the RDD.

# Register the DataFrame as a table. Add table name below as parameter to registerTempTable.


Type:<br>
sqlContext.sql("select * from my_rdd_table").toPandas()

In [None]:
# Run some select statements on your newly created DataFrame and display the output
