# Big Data Fundamentals with PySpark
There's been a lot of buzz about Big Data over the past few years, and it's finally become mainstream for many companies. But what is this Big Data? This course covers the fundamentals of Big Data via PySpark. Spark is a "lightning fast cluster computing" framework for Big Data. It provides a general data processing platform engine and lets you run programs up to 100x faster in memory, or 10x faster on disk, than Hadoop. You’ll use PySpark, a Python package for Spark programming and its powerful, higher-level libraries such as SparkSQL, MLlib (for machine learning), etc. You will explore the works of William Shakespeare, analyze Fifa 2018 data and perform clustering on genomic datasets. At the end of this course, you will have gained an in-depth understanding of PySpark and its application to general Big Data analysis.

Instructor: Upendra Devisetty, Science Analyst at CyVerse

## $\star$ Introduction to Big Data Analysis with Spark
This chapter introduces the exciting world of Big Data, as well as the various concepts and different frameworks for processing Big Data. You will understand why Apache Spark is considered the best framework for BigData.

#### The 3 Vs of Big Data
* The 3 Vs are used to describe big data's characteristics
* **Volume:** Size of the data 
* **Variety:** Different sources and formats of data
* **Velocity:** Speed at which the data is generated and available for processing

#### Big Data concepts and Terminology
* **Clustered computing:** collection of resources of multiple machines
* **Parallel computing:** a type of computation in which many calculations are carried out simultaneously
* **Distributed computing:** Collection of nodes (networked computers) that run in parallel
* **Batch processing:** Breaking the job into small piece and running them on individual machines
* **Real-time processing:** Immediate processing of data

#### Big Data processing systems
* **Hadoop/MapReduce:** Scalable and fault-tolerant framework; written in Java
    * Open source
    * Batch processing
* **Apache Spark:** General purpose and lightning fast cluster computing system
    * Open source
    * Suited for both batch and real-tine data processing

#### Features of Apache Spark framework
* Distributed cluster computing framework
* Efficient in-memory computations for large scale data sets
* Lightning-fast data processing framework
* Provides support for Java, Scala, Python, R, and SQL

#### Spark modes of deployment
* **Local mode:** Single machine such as your laptop
    * Convenient for testing, debugging, and demonstration
* **Cluster mode:** Set of pre-defined machines
    * Good for production
* Typical workflow: Local $\Rightarrow$ clusters
    * During this transition, no code change is necessary

### PySpark: Spark with Python
#### What is Spark shell?
* Interactive environment for running Spark jobs
* Helpful for fast interactive prototyping
* Spark's shells allow interacting with data on disk or in memory across many machines or one, and Spark takes care of automatically distributing this processing
* Three different Spark shells:
    * Spark-shell for Scala
    * PySpark-shell for Python
    * SparkR for R
    
#### PySpark shell
* PySpark shell is the Python-based command line tool
* PySpark shell sllows data scientists to interface with Spark data structures
* PySpark shell supports connecting to a cluster

#### Understanding SparkContext
* SparkContext is an entry point into the world of Spark
* An **entry point** is where control is transferred from the Operating system to the provided program.
    * An entry point is a way of connecting to Spark cluster
    * An entry point is "like a key to the house." 
* Access the SparkContext in the PySpark shell as a variable named `sc`

#### Inspecting SparkContext
* **Version:** to retrieve SparkContext version that you are currently running:
    * `sc.version`
* **Python Version:** to retrieve Python version *that SparkContext is currently using*
    * `sc.pythonVer`
* **Master:** URL of the cluster of "local" string to run in local mode of SparkContext
    * `sc.master`
    * If returns: `local[*]`, means SparkContext acts as a master on a local node using all available threads on the computer where it is running.
    
#### Loading data in PySpark
* SparkContext's **`parallelize()`** method (used on a list)
    * For example, to create parallelized collections holding the numbers 1 to 5:
    * `rdd = sc.parallelize([1, 2, 3, 4, 5])
* SparkContext's **`textFile()`** method (used on a file)
    * For example, to load a text file named `test.txt` using this method:
    * `rdd2 = sc.textFile("test.txt")`
    
```
# Print the version of SparkContext
print("The version of Spark Context in the PySpark shell is", sc.version)

# Print the Python version of SparkContext
print("The Python version of Spark Context in the PySpark shell is", sc.pythonVer)

# Print the master of SparkContext
print("The master of Spark Context in the PySpark shell is", sc.master)
```
***

```
# Create a Python list of numbers from 1 to 100 
numb = range(1, 101)

# Load the list into PySpark  
spark_data = sc.parallelize(numb)
```
***

```
# Load a local file into PySpark shell
lines = sc.textFile(file_path)
```

### Use of lambda function in python- filter()
* Understanding PySpark becomes a lot easier if we understand functional programming principles in Python:
    * `lambda`
    * `map`
    * `filter`
* Python supports the creation of anonymous functions.
    * **Anonymous functions** are functions that are not bound to a name at runtime, using a construct called `lambda`
    * Lambda functions are very powerful and well-integrated into Python
    * Lambda is especially efficient with `map()` and `filter()`
    * Like `def`, Python creates a function to be called later in the program. However, it returns the function instead of assigning it to a name (ie **anonymous**).
    * In practice, they are used as a way to inline a function definition, or to defer execution of a code. 
    
#### Lambda function syntax
* Lambda function can be used whenever function objects are required. 
* They can have any number of arguments, but only one expression, and the expression is evaluated and returned.
* **The general syntax of the lambda function is:**

**`lambda arguments: expression`**

Examples:

```
double = lambda x: x * 2
print(double(3))
```
*** 

```
g = lambda x: X**3
print(g(10))
```

In [1]:
double = lambda x: x * 2
print(double(3))

6


In [3]:
g = lambda x: x**3
print(g(10))

1000


* No return statement for lambda
* Can put lambda function anywhere, without ever assigning it to a variable
* We use lambda functions when we require a nameless function for a short period of time

#### Use of Lambda function in Python - map()
* `map()` function takes a function and a list and returns a new list which contains items returned by that function for each item
* General syntax of `map()`: 
    * **`map(function, list)`**
* Example of `map` with `lambda`:

```
items = [1, 2, 3, 4]
list(map(lambda x: x + 2, items))
```

result:

**`[3, 4, 5, 6]`**


In [4]:
items = [1, 2, 3, 4]
list(map(lambda x: x + 2, items))

[3, 4, 5, 6]

#### Use of lambda function in python- filter()
* `filter()` function takes a function and a list and returns a new list for which the function evaluates as true
* General syntax of filter():
    * **`filter(function, list`**
* Example of `filter()` with `lambda`:

```
items = [1, 2, 3, 4]
list(filter(lambda x: (x%2 != 0), items))
```

In [5]:
items = [1, 2, 3, 4]
list(filter(lambda x: (x%2 != 0), items))

[1, 3]

```
# Print my_list in the console
print("Input list is", my_list)

# Square all numbers in my_list
squared_list_lambda = list(map(lambda x: x**2, my_list))

# Print the result of the map function
print("The squared numbers are", squared_list_lambda)
```
***

```
# Print my_list2 in the console
print("Input list is:", my_list2)

# Filter numbers divisible by 10
filtered_list = list(filter(lambda x: (x%10 == 0), my_list2))

# Print the numbers divisible by 10
print("Numbers divisible by 10 are:", filtered_list)
```

## $\star$ Chapter  2: Programming in PySpark RDDs
The main abstraction Spark provides is a resilient distributed dataset (RDD), which is the fundamental and backbone data type of this engine. This chapter introduces RDDs and shows how RDDs can be created and executed using RDD Transformations and Actions.

#### Introduction to PySpark RDD
In this chapter, we will start working with RDDs which are Spark's core abstraction for working with data. 

* **RDD** = **Resilient Distributed Datasets**
* RDDs are a collection of data distributed across the cluster
* RDD is the fundamental and backbone data type in PySpark

#### Decomposing RDDs
* Resilient Distributed Datasets
    * **Reslient:** ability to withstand failures
    * **Distributed:** spanning across multiple machines
    * **Datasets:** collection of partitioned data e.g., arrays, tables, tuples, etc. ...
    
#### Creating RDDs
* Three methods for creating RDDs
* **Parallelize:**
    * The **simplest method** to create RDDs is to take an existing collection of objects (for example a list, array, or set) and pass it to SparkContext's parallelize method.
* **External datasets:**
    * A **more common** way to create RDDs is to load data from external datasets such as:
        * files stored in HDFS
        * Objects in Amazon S3 bucket
        * lines in a text file
* **From existing RDDs**

#### Parallelized collection (parallelizing)
* RDDs are created from a list or a set using the SparkContext's `parallelize` method.

```
numRDD = sc.parallelize([1, 2, 3, 4])
helloRDD = sc.parallelize("Hello world")
type(helloRDD)
```

#### From external datasets
* Creating RDDs from external datasets is by far the most common method in PySpark
* `textFile()` for creating RDDs from external datasets
* For file README stored locally:
* `fileRDD = sc.textFile("README.md")`
* `type(fileRDD)`

#### Understanding Partitioning in PySpark
* Data partitioning is an important concept in Spark and understanding how Spark deals with partitions allows one to control parallelism.
* A **partition** is a logical division of a large distributed data set. 
* By default, Spark partitions the data at the time of creating RDD based on several factors such as available resources, external datasets, etc. but this behavior can also be controlled by passing a second argument called `minPartitions`, which defines the minimum number of partitions to be created for an RDD
* `parallelize()` method:
    * `numRDD = sc.parallelize(range(10), minPartitions = 6)`
* `textFile()` method:
    * `fileRDD = sc.textFile("README.md", minPartitions = 6)`
* The number of partitions in an RDD can always be found by using the `getNumPartitions()` method

```
# Create an RDD from a list of words
RDD = sc.parallelize(["Spark", "is", "a", "framework", "for", "Big Data processing"])

# Print out the type of the created object
print("The type of RDD is", type(RDD))
```
***

```
# Print the file_path
print("The file_path is", file_path)

# Create a fileRDD from file_path
fileRDD = sc.textFile(file_path)

# Check the type of fileRDD
print("The file type of fileRDD is", type(fileRDD))
```
***

```
# Check the number of partitions in fileRDD
print("Number of partitions in fileRDD is", fileRDD.getNumPartitions())

# Create a fileRDD_part from file_path with 5 partitions
fileRDD_part = sc.textFile(file_path, minPartitions = 5)

# Check the number of partitions in fileRDD_part
print("Number of partitions in fileRDD_part is", fileRDD_part.getNumPartitions())
```

#### RDD Operations in PySpark
* RDDs in PySpark supports two different types of operations:
    * Transformations
    * Actions
* **Transformations** are operations on RDDs that return a new RDD
    * follow lazy evaluation
    * basic RDD transformations:
        * `map()`
        * `filter()`
        * `flatMap()`
        * `union()`
* **Actions** are operations that perfomr some computation on the RDD

#### map() Transformation 
* The `map()` transformation applies a function to all elements in the RDD
* Example:

```
RDD = sc.parallelize([1, 2, 3, 4])
RDD_map = RDD.map(lambda x: x * x)
```

#### filter() Transformation
* The `filter()` transformation takes in a function and returns an RDD that only has elements that pass the condition.

```
RDD = sc.parallelize([1, 2, 3, 4])
RDD_filter = RDD.filter(lambda x: x > 2)
```

#### flatmap() Transformation 
* The `flatMap()` transformation is similar to `map()` transformation, except that it returns multiple values for each element in the source RDD.
* A simple usage of `flatMap()` is splitting up an input string into words
* Even thought input RDD has 2 elements, for example, the output RDD now contains 5 elements:

```
RDD = sc.parallelize(["hello world", "how are you"])
RDD_flatmap = RDD.flatMap(lambda x: x.split(" "))
```

#### union() Transformation
* The `union()` transformation returns the union of one RDD with another RDD
* Similar to pandas' `concat()`

```
inputRDD = sc.textFile("logs.txt")
errorRDD = inputRDD.filter(lambda x: "error" in x.split())
warningsRDD = inputRDD.filter(lambda c: "warnings" in x.split())
combinedRDD = errorRDD.union(warningsRDD)
```
### RDD Actions
* Operation return a value after running a computation on the RDD
* Basic RDD Actions
    * **`collect()`:** returns all the elements of the dataset as an array
        * `RDD_map.collect()`
    * **`take(n)`:** returns an array with the first N elements of the dataset
        * `RDD_map.take(2)`
    * **`first()`:** prints the first element of the RDD
        * `RDD_map.first()`
    * **`count()`:** return the number of elements in the RDD
        * `RDD_flatmap.count()`
        
```
# Create map() transformation to cube numbers
cubedRDD = numbRDD.map(lambda x: x**3)

# Collect the results
numbers_all = cubedRDD.collect()

# Print the numbers from numbers_all
for numb in numbers_all:
	print(numb)
```
*** 

```
# Filter the fileRDD to select lines with Spark keyword
fileRDD_filter = fileRDD.filter(lambda line: 'Spark' in line)

# How many lines are there in fileRDD?
print("The total number of lines with the keyword Spark is", fileRDD_filter.count())

# Print the first four lines of fileRDD
for line in fileRDD_filter.take(4): 
  print(line)
```

#### Working with Pair RDDs in PySpark
* Real life datasets are usually keyvaue pairs 
* Eah row is a key and maps to one or more values
* **Pair RDD** is a special data structure to work with this kind of dataset
* **Pair RDD:** Key is the identifier and value is data

#### Creating pair RDDs
* Two common ways to create pair RDDs:
    * From a list of key-value tuples
    * From a regular RDD
* Get the data into keyvalue form for paired RDD

```
my_tuple = [('Sam', 23), ('Mary', 34), ('Peter', 25)]
pairRDD_tuple = sc.parallelize(my_tuple)
```
***

```
my_list = ['Sam 23', 'Mary 34', 'Peter 25']
regularRDD = sc.parallelize(my_list)
pairRDD_RDD = regularRDD.map(lambda s: (s.split(' ')[0], s.split(' ')[1]))
```

#### Transformations on pair RDDs
* All regular transformations work on pair RDDs
* Because they contain tuples, we need to pass functions that operate on key value pairs rather than on individual elements
* Examples of paired RDD Transformations:
    * **`reduceByKey()`**: Group values with the same key
    * **`groupByKey()`**: Group values with the same key
    * **`sortByKey()`**: Return an RDD sorted by the key
    * **`join()`**: Join two pair RDDs based on their key
    
#### reduceByKey() transformation
* The most popular pair RDD transformation which combines values with the same key using a function
* It runs parallel operations for each key in the dataset
* `reduceByKey()` is a transformation and not an action, as datasets can have very large numbers of keys

```
regularRDD = sc.parallelize(["Messi", 23), ("Ronaldo", 34), ("Neymar", 22), ("Messi", 24)])
pairRDD_reducebykey = regularRDD.reduceByKey(lambda x,y : x + y)
pairRDD_reducebykey.collect()
```
* result: `[('Neymar', 22), ('Ronaldo', 24), ('Messi', 47)]`

#### sortByKey() transformation
* Sorting of data is necessary for many downstream applications
* We can sort pair RDDs as long as there is an ordering defined in the key.
* `sortByKey()` operation orders pair RDD by key
* It returns an RDD sorted by key in ascending or descending order

```
pairRDD_reducebykey_rev = pairRDD_reducebykey.map(lambda x: (x[1], x[0]))
pairRDD_reducebykey_rev.sortByKey(ascending=False).collect()
```
* result: `[(47, 'Messi'), (34, 'Ronaldo'), (22, 'Neymar')]`

#### groupByKey() transformation
* `groupByKey()` groups all the values with the same key in the pair RDD
* If the data is already keyed in the way that we want, the `groupByKey` operation groups all the values with the same key in the pair RDD.

```
airports = [("US", "JFK"), ("UK", "LHR"), ("FR", "CDG"), ("US", "SFO")]
regularRDD = sc.parallelize(airports)
pairRDD_group = regularRDD.groupByKey().collect()
for cont, air in pairRDD_group:
    print(cont, list(air))
```
* result:

```
FR ['CDG']
US ['JFK', 'SFO']
UK ['LHR']
```

#### join() transformation
* `join()` joins two pair RDDs based on their key 

```
RDD1 = sc.parallelize([("Messi", 34), ("Ronaldo", 32), ("Neymar", 24)})
RDD2 = sc.parallelize([("Ronaldo", 80), ("Neymar", 120), ("Messi", 100)])
RDD1.join(RDD2).collect()
```
* results: `[('Neymar', (24, 120)), ('Ronaldo', (32, 80)), ("Messi", (34,100))]`

#### reduceByKey

```
# Create PairRDD Rdd with key value pairs
Rdd = sc.parallelize([(1,2), (3,4), (3,6), (4,5)])

# Apply reduceByKey() operation on Rdd
Rdd_Reduced = Rdd.reduceByKey(lambda x, y: x+y)

# Iterate over the result and print the output
for num in Rdd_Reduced.collect(): 
  print("Key {} has {} Counts".format(num[0], num[1]))
```
***

```
# Sort the reduced RDD with the key by descending order
Rdd_Reduced_Sort = Rdd_Reduced.sortByKey(ascending=False)

# Iterate over the result and retrieve all the elements of the RDD
for num in Rdd_Reduced_Sort.collect():
  print("Key {} has {} Counts".format(num[0], num[1]))
```

### More actions

#### reduce() action
* `reduce(func)` action is used for aggregating the elements of a regular RDD
* The function should be *commutative* (changing the order of the operands does not change the result) and associative

```
x = [1, 3, 4, 6]
RDD = sc.parallelize(x)
RDD.reduce(lambda x, y : x + y)
```

#### saveAsTextFile() action
* In many cases, it is not advisable to run the `collect()` action on RDDs because of the huge size of the data
* In these cases, it's common to write data out to a distributed storage system such as HDFS or Amazzon S3.
* `saveAsTextFile()` action saves RDD into a text file inside a directory with each partition as a separate file.
* Below is an example of `saveAsTextFile` that saves an RDD with each partition as a separate file inside a directory:

```
RDD.saveAsTextFile("tempFile")
```
* However, you can change it to return a new RDD that is reduced into a single partition using the `coalesce()` method
* The `coalesce()` method can be used to save RDD as a single text file.

```
RDD.coalesce(1).saveAsTextFile("tempFile")
```

### Action Operations on pair RDDs
* RDD actions available for PySpark pair RDDs
* Pair RDD actions leverage the key-value data
* Few examples of pair RDD actions include:
    * `countByKey()`
    * `collectAsMap()`
    
#### countByKey() action
* `countByKey()` is only available on RDDs of type (Key, Value)
* `countByKey()` operation counts the number of elements for each key.

```
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
for kee, val in rdd.countByKey().items():
    print(kee, val)
```
* result:

```
('a', 2)
('b', 1)
```
* One thing to **note** is that `countByKey()` should only be used on a datset whose size is small enough to fit in memory

#### collectAsMap() action
* `collectAsMap()` return the key-value pairs in the RDD as a dictionary

```
sc.parallelize([(1, 2), (3,4)]).collectAsMap()
```
* result: `{1:2, 3:4]`
* **Note** this actino should also only be used if the resulting data is expected to be small and all the data is loaded into memory.

#### Exercises: CountingBykeys
For many datasets, it is important to count the number of keys in a key/value dataset. For example, counting the number of countries where the product was sold or to show the most popular baby names. In this simple exercise, you'll use the `Rdd` that you created earlier and count the number of unique keys in that pair RDD.

Remember, you already have a SparkContext `sc` and `Rdd` available in your workspace.

```
# Count the unique keys
total = Rdd.countByKey()

# What is the type of total?
print("The type of total is", type(total))

# Iterate over the total and print the output
for k, v in total.items(): 
  print("key", k, "has", v, "counts")
```

#### Exercises: Create a base RDD and transform it
The volume of unstructured data (log lines, images, binary files) in existence is growing dramatically, and PySpark is an excellent framework for analyzing this type of data through RDDs. In this 3 part exercise, you will write code that calculates the most common words from [Complete Works of William Shakespeare](https://www.gutenberg.org/ebooks/100).

Here are the brief steps for writing the word counting program:
* Create a base RDD from `Complete_Shakespeare.txt` file.
* Use RDD transformation to create a long list of words from each element of the base RDD.
* Remove stop words from your data.
* Create pair RDD where each element is a pair tuple of `('w', 1)`
* Group the elements of the pair RDD by key (word) and add up their values.
* Swap the keys (word) and values (counts) so that keys is count and value is the word.
* Finally, sort the RDD by descending order and print the 10 most frequent words and their frequencies.

In this first exercise, you'll create a base RDD from `Complete_Shakespeare.txt` file and transform it to create a long list of words.

Remember, you already have a SparkContext `sc` already available in your workspace. A `file_path` variable (which is the path to the `Complete_Shakespeare.txt` file) is also loaded for you.

```
# Create a baseRDD from the file path
baseRDD = sc.textFile(file_path)

# Split the lines of baseRDD into words
splitRDD = baseRDD.flatMap(lambda x: x.split())

# Count the total number of words
print("Total number of words in splitRDD:", splitRDD.count())
```

#### Exercises: Remove stop words and reduce the dataset
After splitting the lines in the file into a long list of words in the previous exercise, in the next step, you'll remove stop words from your data. Stop words are common words that are often uninteresting. For example "I", "the", "a" etc., are stop words. You can remove many obvious stop words with a list of your own. But for this exercise, you will just remove the stop words from a curated list `stop_words` provided to you in your environment.

After removing stop words, you'll next create a pair RDD where each element is a pair tuple `(k, v)` where `k` is the key and `v` is the value. In this example, pair RDD is composed of `(w, 1)` where `w` is for each word in the RDD and 1 is a number. Finally, you'll combine the values with the same key from the pair RDD.

Remember you already have a SparkContext `sc` and `splitRDD` available in your workspace.

```
# Convert the words in lower case and remove stop words from the stop_words curated list
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)

# Create a tuple of the word and 1 
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))

# Count of the number of occurences of each word
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)
```

#### Exercises: Print word frequencies
After combining the values (counts) with the same key (word), in this exercise, you'll return the first 10 word frequencies. You could have retrieved all the elements at once using collect() but it is bad practice and not recommended. RDDs can be huge: you may run out of memory and crash your computer...

What if we want to return the top 10 words? For this, first you'll need to swap the key (word) and values (counts) so that keys is count and value is the word. After you swap the key and value in the tuple, you'll sort the pair RDD based on the key (count). This way it is easy to sort the RDD based on the key rather than the key using `sortByKey `operation in PySpark. Finally, you'll return the top 10 words from the sorted RDD.

You already have a SparkContext `sc` and `resultRDD` available in your workspace.

```
# Display the first 10 words and their frequencies from the input RDD
for word in resultRDD.take(10):
	print(word)

# Swap the keys and values from the input RDD
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))

# Sort the keys in descending order
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)

# Show the top 10 most frequent words and their frequencies from the sorted RDD
for word in resultRDD_swap_sort.take(10):
	print("{},{}". format(word[1], word[0]))
```


# $\star$ Chapter 3: PySpark SQL & DataFrames
In this chapter, you'll learn about Spark SQL which is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine. This chapter shows how Spark SQL allows you to use DataFrames in Python.

### Abstracting Data with DataFrames
* **PySpark SQL** is Spark's high-level API for working with structured data.
* **PySpark SQL** is a Spark library for structured data. It provides more information about the structure of the data and computation.
* A **PySpark DataFrame** is an immutable distributed collection of data with named columns
    * Designed to process a large collection of both structured (e.g. relational database) and semi-structured data (e.g. JSON: JavaScript Object Notation)
    * Support both SQL queries (`SELECT * from table`) or expression methods (`df.select()`)
    
#### SparkSession- Entry point for DataFrame API
* SparkContext is the main entry point for creating RDDs
* SparkSession provides a single point of entry to interact with Spark DataFrames
* The SparkSession does for DataFrames with the SparkContext does for RDDs
* SparkSession can be used to create DataFrames, register DataFrames, execute SQL queries
* SparkSession is available in PySpark shell as `spark`

#### Creating DataFrames in PySpark
* Two different methods of creating DataFrames in PySpark
    * From existing RDDs using SparkSession's `createDataFrame()` method
    * From various data sources (CSV, JSON, TXT) using SparkSession's read method
* **Schema** controls the data and helps DataFrames to optimize queries
* **Schema** provides informational detail such as the column name, the type of data in the column, empty values, etc. ...

#### Create a DataFrame from RDD

```
iphones_RDD = sc.parallelize([
    ("XS", 2018, 5.65, 2.79, 6.24),
    ("XR", 2018, 5.94, 2.98, 6.84),
    ("X10", 2017, 5.65, 2.79, 6.13),
    ("8Plus", 2017, 6.23, 3.07, 7.12)
])

names = ['Model', 'Year', 'Height', 'Width', 'Weight']

iphones_df = spark.createDataFrame(iphones_RDD, schema = names)
type(iphones_df)
```
* **NOTE:** When the schema is a list of column names, the type of each column will be inferred from data.
    * However, when the schema is `None`, it will try to infer the schema from data.
    
#### Create a DataFrame from reading a CSV/JSON/TXT
* `df_csv = spark.read.csv('people.csv', header=True, inferSchema=True)`
* `df_json = spark.read.json("people.json", header=True, inferSchema=True)`
* `df_txt = spark.read.txt("people.txt", header=True, inferSchema=True)`

* Requires the path to the file and two optional parameters:
    * `header`
    * `inferSchema` (default is False)

#### Exercises:
```
# Create an RDD from the list
rdd = sc.parallelize(sample_list)

# Create a PySpark DataFrame
names_df = spark.createDataFrame(rdd, schema=['Name', 'Age'])

# Check the type of names_df
print("The type of names_df is", type(names_df))
```
***

```
# Create an DataFrame from file_path
people_df = spark.read.csv(file_path, header=True, inferSchema=True)

# Check the type of people_df
print("The type of people_df is", type(people_df))
```

### Interacting with PySpark DataFrames
* Just like RDDs, DataFrames also support both transformations and actions

#### DataFrame operators in PySpark
* DataFrame operations: Transformations and Actions

#### DataFrame Transformations
* **`select('column')`** : transformation subsets one or more columns in the DataFrame
* **`filter(df.column <= condition)`** : transformation filters out the rows based on a condition
* **`groupby('column')`** : operation can by used to group a variable *so that we can perform aggregations on them*
* **`orderby('column)`** : operation sorts the DataFrame based on one or more columns
* **`dropDuplicates()`** : removes the duplicate rows of a DataFrame
* **`withColumnRenamed('former_name', 'new_name')`** : renames a column in the DataFrame

#### DataFrame actions
* **`printSchema()`** : operation prints the types of the columns in DataFrame; see note below
* **`head(n)`** : shows first n rows
* **`show(n)`** : action prints first n rows in the DataFrame; default n is 20
* **`count()`** : counts number of occurences of __
* **`columns`** : prints the columns of a DataFrame
* **`describe()`** : operation computes summary statistics of numerical columns in the DataFrame
* **Correction: `printSchema()` is a method for any Spark dataset/dataframe and not an action**

#### Exercises: Inspecting data in PySpark DataFrame

```
# Print the first 10 observations 
people_df.show(10)

# Count the number of rows 
print("There are {} rows in the people_df DataFrame.".format(people_df.count()))

# Count the number of columns and their names
print("There are {} columns in the people_df DataFrame and their names are {}".format(len(people_df.columns), people_df.columns))
```
***

```
# Select name, sex and date of birth columns
people_df_sub = people_df.select('name', 'sex', 'date of birth')

# Print the first 10 observations from people_df_sub
people_df_sub.show(10)

# Remove duplicate entries from people_df_sub
people_df_sub_nodup = people_df_sub.dropDuplicates()

# Count the number of rows
print("There were {} rows before removing duplicates, and {} rows after removing duplicates".format(people_df_sub.count(), people_df_sub_nodup.count()))
```
***

```
# Filter people_df to select females 
people_df_female = people_df.filter(people_df.sex == "female")

# Filter people_df to select males
people_df_male = people_df.filter(people_df.sex == "male")

# Count the number of rows 
print("There are {} rows in the people_df_female DataFrame and {} rows in the people_df_male DataFrame".format(people_df_female.count(), people_df_male.count()))
```

#### Interacting with DataFrames using PySpark SQL
* DataFrame API vs SQL queries
* The DataFrames API provides a programmatic domain-specific language (DSL) for data
* DataFrame transformations and actions (queries) are much easier to construct programmatically
* SQL queries can be much more concise and easier to understand and portable

#### Executing SQL Queries
* The SparkSession `sql()` method executes SQL queries
* `sql()` method takes a SQL statement as an argument and returns the result as a DataFrame representing the result of the given query.
* To issue SQL queries against an existing DataFarme we can leverage the `createOrReplaceTempView` function to build a temporary table as shown in this example:
* `df.createOrReplaceTempView("table1")`
* After creating the temporary table, we can simply use the `sql` method

```
df.createOrReplaceTempView("table1")
df2 = spark.sql("SELECT field1, field2 FROM table1")
df2.collect()
```

#### SQL query to extract data

```
test_df.createOrReplaceTempView("test_table")
query = '''SELECT Product_ID FROM test_table'''
test_product_df = spark.sql(query)
test_product_df.show(5)
```
* Because the result of SQL query returns a DataFrame, all the usual DataFrame operations are available. 

#### Exercises: Running SQL Queries Programmatically

```
# Create a temporary table "people"
people_df.createOrReplaceTempView("people")

# Construct a query to select the names of the people from the temporary table "people"
query = '''SELECT name FROM people'''

# Assign the result of Spark's query to people_df_names
people_df_names = spark.sql(query)

# Print the top 10 names of the people
people_df_names.show(10)
```

#### Exercises: SQL queries for filtering Table

```
# Filter the people table to select female sex 
people_female_df = spark.sql('SELECT * FROM people WHERE sex=="female"')

# Filter the people table DataFrame to select male sex
people_male_df = spark.sql('SELECT * FROM people WHERE sex=="male"')

# Count the number of rows in both DataFrames
print("There are {} rows in the people_female_df and {} rows in the people_male_df DataFrames".format(people_female_df.count(), people_male_df.count()))
```

### Data Visualization in PySpark using DataFrames
* Currently there are **three** different methods available for plotting graphs using PySpark DataFrames. 
    * **`Pyspark_dist_explore`** library
    * **`toPandas()`**
    * **`HandySpark`** library
    
#### Pyspark_dist_explore
* `Pyspark_dist_explore` library provides quick insights into DataFrames
* Currently three functions available: 
    * `hist()`
    * `distplot()`
    * `pandas_histogram()`
    
```
test_df = spark.read.csv("test.csv", header=True, inferSchema=True)
test_df_age = test_df.select('Age')
hist(test_df_age, bins = 20, color='red')
```

#### Using toPandas() for plotting DataFrames
* Converts the PySpark DataFame into a Pandas DataFrame
* After conversion, it's easy to create charts from pandas DataFrames using marplotlib or seaborn plotting tools.

```
test_df = spark.read.csv("test.csv", header=True, inferSchema= True)
test_df_sample_pandas = test_df.toPandas()
test_df_sample_pandas.hist('Age')
```

#### Pandas DataFrame vs PySpark DataFrame
* Pandas DataFrames are in-memory, single-server based structions and operation on PySpark run in parallel
* Pandas DataFrames are limited by your server memory, and you will process them with the power of a single server.
* In contrast, operations on PySpark DataFrames run parallel on different nodes in the cluster
* The result is generated as we apply any operation in Pandas whereas operations in PySpark DataFrame are lazy evaluation
* Pandas DataFrame is mutable and PySpark DataFrames are immutable
* Pandas API supports more operations than PySpark DataFrame API

#### HandySpark method of visualization
* Relatively new package
* Designed to improve PySpark user experience, especially when with EDA
* It makes fetching data or computing statistics for columns really easy, returning pandas objects straight away
* It brings the long-missing capability of plotting data while retaining the advantage of performing the distributed computation

#### Exercises: PySpark DataFrame visualization 

```

# Check the column names of names_df
print("The column names of names_df are", names_df.columns)

# Convert to Pandas DataFrame  
df_pandas = names_df.toPandas()

# Create a horizontal bar plot
df_pandas.plot(kind='barh', x='Name', y='Age', colormap='winter_r')
plt.show()
```

#### Exercises: Part 1: Create a DataFrame from CSV file

```
# Load the Dataframe
fifa_df = spark.read.csv(file_path, header=True, inferSchema=True)

# Check the schema of columns
fifa_df.printSchema()

# Show the first 10 observations
fifa_df.show(10)

# Print the total number of rows
print("There are {} rows in the fifa_df DataFrame".format(fifa_df.count()))
```

#### Exercises: Part 2: SQL Queries on DataFrame

```
# Create a temporary view of fifa_df
fifa_df.createOrReplaceTempView('fifa_df_table')

# Construct the "query"
query = '''SELECT Age FROM fifa_df_table WHERE Nationality == "Germany"'''

# Apply the SQL "query"
fifa_df_germany_age = spark.sql(query)

# Generate basic statistics
fifa_df_germany_age.describe().show()
```

#### Exercises: Part 3: Data visualization

```
# Convert fifa_df to fifa_df_germany_age_pandas DataFrame
fifa_df_germany_age_pandas = fifa_df_germany_age.toPandas()

# Plot the 'Age' density of Germany Players
fifa_df_germany_age_pandas.plot(kind='density')
plt.show()
```

# $\star$ Machine Learning with PySpark MLlib
PySpark MLlib is the Apache Spark scalable machine learning library in Python consisting of common learning algorithms and utilities. Throughout this last chapter, you'll learn important Machine Learning algorithms. You will build a movie recommendation engine and a spam filter, and use k-means clustering.

### Overview of PySpark MLlib
* PySpark MLlib is a component of Apache Spark for machine learning with the goal of making practical ML scalable and easy.
* Provides various tools including:
    * **ML Algorithms:** collaborative filtering, classification, and clustering
    * **Featurization:** feature extraction, transformation, dimensionality reduction, and selection
    * **Pipelines:** tools for constructing, evaluating, and tuning ML Pipelines
    
#### Why PySpark MLlib?
* sklearn algorithms only work for a small datasets on a single machine
* PySpark MLlib algorithms are designed for parallel processing on a cluster
* MLlib supports languages such as Scala, Java, and R
* Provides a high-level API to build ML pipelines

#### PySpark MLlib Algorithms
* **Classification (Binary and Multiclass) and Regression:** Linear SVMs, logistic regression, decision trees, random forests, gradient-boosted trees, naive Bayes, linear least squares, Lasso, ridge regression, isotonic regression
* **Collaborative filtering:** Alternating least squares (ALS)
* **Clustering:** K-means, Gaussian mixture, Biseting K-means and Streaming K-Means

#### The three C's of ML in PySpark MLlib
* **Collaborative filtering (recommender engines):** Produce recommendations
* **Classification:** Identifying to which a set of categories a new observation
* **Clustering:** Groups data based on similar characteristics

#### PySpark MLlib imports
* `pyspark.mllib.recommendation`
* `from pyspark.mllib.recommendation import ALS`
***
* `pyspark.mllib.classification`
* `from pyspark.mllib.classification import LogisticRegressionWithLBRGS`
***
* `pyspark.mllib.clustering`
* `from pyspark.mllib.clustering import KMeans`

#### Exercises: PySpark MLlib algorithms
```
# Import the library for ALS
from pyspark.mllib.recommendation import ALS

# Import the library for Logistic Regression
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

# Import the library for Kmeans
from pyspark.mllib.clustering import KMeans
```

### Collaborative filtering
* **Collaborative filtering** is a method of making automatic predictions about the interests of a user by collecting preferences or taste information from many users
* **Collaborative filtering** is finding users that share common interests
* Collaborative filtering is one of the most commonly used algorithms for recommender systems
* Two approaches:
    * **The User-User Approach**: Finds users that are similar to the target user
    * **The Item-Item Approach**: Finds and recommends items that are similar to items with the target user
    
#### Rating class in pyspark.mllib.recommendation submodule
* The Rating class is a wrapper around a tuple (user, product, and rating)
* Useful for parsing the RDD and creating a tuple of user, product, and rating

```
from pyspark.mllib.recommendation import Rating
r = Rating(user = 1, product = 2, rating = 5.0)
(r[0], r[1], r[2])
```
* result: `(1, 2, 5.0)`

#### Splitting the data using randomSplit()
* Splitting data into training and testing sets is important for evaluating predictive modeling
* Typically a large portion of data is assigned to training compared to testing data (70-30 or 80-20 split)
* PySpark's `randomSplit()` method randomly splits with the provided weights and returns multiple RDDs

```
data = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
training, test = data.randomSplit([0.6, 0.4])
training.collect()
test.collect()
```
result:

```
[1, 2, 5, 6, 9, 10]
[3, 4, 7, 8]
```

#### Alternating Least Squares (ALS)
* Alternating Least Squares (ALS) algorithm in `spark.mllib` provides collaborative filtering 
* `ALS.train(ratings, rank, iterations)`

```
r1 = Rating(1, 1, 1.0)
r2 = Rating(1, 2, 2.0)
r3 = Rating(2, 1, 2.0)
ratings = sc.parallelize([r1, r2, r3])
ratings.collect()
```
returns:

```
[Rating(user=1, product=1, rating=1.0),
 Rating(user=1, product=2, rating=2.0),
 Rating(user=2, product=1, rating=2.0)]
```
* `model = ALS.train(ratings, rank=10, iterations=10)`

#### predictAll() - Returns RDD of Rating Objects
* **After training the model, the next step is predicting the ratings for the user and product pairs.**
* The method takes in an RDD without ratings to generate the ratings
* Below we create an RDD from a list of tuples containing `userId` and `productId` using Spark Context's parallelize method. 
* `unrated_RDD = sc.parallelize([(1,2), (1,1)])`

```
predictions = model.predictAll(unrated_RDD)
predictions.collect()
```
returns:

```
[Rating(user=1, product=1, rating=1.0000278574351853),
 Rating(user=1, product=2, rating=1.9890355703778122)]
```

#### Model evaluation using MSE
* The MSE measures the average of the squares of the errors between what is estimated and the existing data.
* The MSE is the average value of the square of `(actual rating - predicted rating)`
* We'll first organize our ratings and prediction data to make (user, product), the rating.

```
rates = ratings.map(lambda x: ((x[0], x[1]), x[2]))
rates.collect()
```
returns: `[((1, 1), 1.0), ((1, 2), 2.0), ((2,1), 2.0)]`

* Next we will join the ratings RDD with the prediction RDD and the results looks as follows:

```
preds = predictions.map(lambda x: ((x[0], x[1]), x[2]))
preds.collect()
```
returns: `[((1,1), 1.0000278574351853), ((1,2), 1.9890355703778122)]`

* Next, we will join the ratings RDD with the prediction RD and the results will look as follows:

```
rates_preds = rates.join(preds)
rates_preds.collect()
```
returns: `[((1,2),(2.0, 1.9890355703778122)), ((1,1), (1.0, 1.0000278574351853))]`
* Finally, we apply a squared difference function to the map
* `MSE = rates_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()`

#### Exercises: Loading Movie Lens dataset into RDDs
* Dataset: [MovieLens 100k dataset](https://grouplens.org/datasets/movielens/100k/)

```
# Load the data into RDD
data = sc.textFile(file_path)

# Split the RDD 
ratings = data.map(lambda l: l.split(','))

# Transform the ratings RDD
ratings_final = ratings.map(lambda line: Rating(int(line[0]), int(line[1]), float(line[2])))

# Split the data into training and test
training_data, test_data = ratings_final.randomSplit([0.8, 0.2])
```

#### Model training and predictions

```
# Create the ALS model on the training data
model = ALS.train(training_data, rank=10, iterations=10)

# Drop the ratings column 
testdata_no_rating = test_data.map(lambda p: (p[0], p[1]))

# Predict the model  
predictions = model.predictAll(testdata_no_rating)

# Return the first 2 rows of the RDD
predictions.take(2)
```

#### Exercises: Model evaluation using MSE

```
# Prepare ratings data
rates = ratings_final.map(lambda r: ((r[0], r[1]), r[2]))

# Prepare predictions data
preds = predictions.map(lambda r: ((r[0], r[1]), r[2]))

# Join the ratings data with predictions data
rates_and_preds = rates.join(preds)

# Calculate and print MSE
MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error of the model for the test data = {:.2f}".format(MSE))
```