In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:99% !important; }</style>"))

In [2]:
import pyspark
import pandas as pd
import numpy as np
import matplotlib as plt
import os

# Chapter 1: Introduction to Big Data analysis with Spark

## What is big data?

1. Fundamentals of Big Data

Welcome to the first video of Big Data fundamentals via PySpark course. My name is Upendra Devisetty and I am a Science Analyst at CyVerse. Let's get started.

2. What is Big Data?

What exactly is Big Data? There is no single definition of Big Data because projects, vendors, practitioners, and business professionals use it quite differently. According to Wikipedia - Big data is a term used to refer to the study and applications of data sets that are too complex for traditional data-processing software. There are three

3. The 3 V's of Big Data

Vs of Big data that are used to describe its characteristics. They are volume, velocity, and variety. Volume refers to the size of data. Variety refers to different sources and formats of data. Velocity is the speed at which data is generated and available for processing. Now let's take a look at some

4. Big Data concepts and Terminology

of the concepts and terminology of Big Data. Clustered computing is the pooling of resources of multiple machines to complete jobs. Parallel computing is a type of computation in which many calculations are carried out simultaneously. A distributed computing involves nodes or networked computers that run jobs in parallel. Batch processing refers to the breaking data into smaller pieces and running each piece on an individual machine. Real-time processing demands that information is processed and made ready immediately. There are two popular

5. Big Data processing systems

frameworks for Big Data processing. The first is the highly successful Hadoop/MapReduce framework. Hadoop/MapReduce framework is open source and scalable framework for batch data. The second is the most popular Apache Spark which is a parallel framework for storing and processing of Big Data across clustered computers. It is also open source and is suited for both batch and real-time data processing. In this course, you'll learn about Apache Spark. Let's talk about the main

6. Features of Apache Spark framework

features of Apache Spark. Spark distributes data and computation across multiple computers executing complex multi-stage applications such as machine learning. Spark runs most computations in memory and thereby provides better performance for applications such as interactive data mining. Spark helps to run an application up to 100 times faster in memory, and 10 times faster when running on disk. Spark is mainly written in Scala language but also have support for Java, Python, R, and SQL. Apache Spark is a

7. Apache Spark Components

powerful alternative to Hadoop MapReduce, with rich features like machine learning, real-time stream processing, and graph computations. At the center of the ecosystem is the Spark Core which contains the basic functionality of Spark. The rest of Spark’s libraries are built on top of it. First is Spark SQL, which is a library for processing structured and semi-structured data in Python, Java, and Scala. The second is MLlib, which is a library of common machine learning algorithms. The third component is GraphX, which is a collection of algorithms and tools for manipulating graphs and performing parallel graph computations. Finally, Spark Streaming is a scalable, high-throughput processing library for real-time data. In this course, you'll learn about SparkSQL and MLlib.

8. Spark modes of deployment

Spark can be run on two modes. The first is the local mode where you can run Spark on a single machine such as your laptop. The local mode is very convenient for testing, debugging and demonstration purposes. The second is the cluster mode where Spark is run on a cluster. The cluster mode is mainly used for production. The development workflow is that you start on local mode and transition to cluster mode. During the transition from local to cluster mode, no code change is necessary. In this course, you'll be using local mode.

9. Coming up next - PySpark

In the next video, you'll learn about PySpark which is the Python API for Spark.

## PySpark: Spark with Python
1. PySpark: Spark with Python

In the last video, you were introduced to Apache Spark which is a fast and general-purpose framework for Big data processing. Apache Spark provides high-level APIs in Scala, Java, Python, and R. In this video, you'll learn about PySpark which is Spark's version of Python.

2. Overview of PySpark

Apache Spark is originally written in Scala programming language. To support Python with Spark, PySpark was developed. Unlike previous versions, the newest version of PySpark provides computation power similar to Scala. APIs in PySpark are similar to Pandas & Scikit-learn Python packages. Thus, the entry level barrier to PySpark is very low for beginners.

3. What is Spark shell?

Spark comes with interactive shells that enable ad-hoc data analysis. Spark shell is an interactive environment through which one can access Spark's functionality quickly and conveniently. Spark shell is particularly helpful for fast interactive prototyping before running the jobs on clusters. Unlike most other shells, Spark shell allow you to interact with data that is distributed on disk or in memory across many machines, and Spark takes care of automatically distributing this processing. Spark provides the shell in three programming languages: spark-shell for Scala, PySpark for Python and sparkR for R. PySpark

4. PySpark shell

shell is the Python-based command line tool to develop Spark's interactive applications in Python. PySpark helps data scientists interface with Spark data structures in Apache Spark and Python. Similar to Scala Shell, Pyspark shell has been augmented to support connecting to a cluster. In this course, you'll use PySpark Shell. In order

5. Understanding SparkContext

to interact with Spark using PySpark shell, you need an entry point. SparkContext is an entry point to interact with underlying Spark functionality. Before understanding SparkContext, let’s understand what an entry point is. An entry point is where control is transferred from the Operating system to the provided program. In simpler terms, it's like a key to your house. Without the key you cannot enter the house, similarly, without an entry point, you cannot run any PySpark jobs. You can access the SparkContext in the PySpark shell as a variable named sc. Now let's take a look at some of the important attributes of SparkContext.

6. Inspecting SparkContext

The first is the version. This attribute shows the version of spark that you are currently running. In this example, sc dot version `sc.version` shows the version of spark that is running in this course's environment. The second is the Python version. This attribute shows the version of Python that Spark is currently using. In this example, sc dot pythonVer `sc.pythonVer` shows the version of Python that is running in this course's environment. The final attribute is the Master. Master is the URL of the cluster or “local” string to run in local mode. In this example, sc dot master `sc.master` returns local meaning the SparkContext acts as a master on a local node using all available threads on the computer where it is running. You can load your raw data

7. Loading data in PySpark

into PySpark using SparkContext by two different methods. The first is the SparkContext’s `parallelize` method on a list. For example, here is how to create parallelize collections holding the numbers 1 to 5. The second is the SparkContext’s `textFile` method on a file. For example, here’s a way to load a text file named "test-dot-txt" using SparkContext's textFile method. Now that you

8. Let's practice

understand PySpark, let's write your first Spark code in PySpark shell.

In [3]:
spark_home = os.environ.get('SPARK_HOME', None)
java_home = os.environ.get('JAVA_HOME', None)
print(spark_home,java_home)


None C:\Program Files\Microsoft\jdk-17.0.7.7-hotspot\


In [4]:
sc = pyspark.SparkContext()

### Understanding SparkContext
A SparkContext represents the entry point to Spark functionality. It's like a key to your car. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. PySpark automatically creates a `SparkContext` for you in the PySpark shell (so you don't have to create it by yourself) and is exposed via a variable `sc`.

In this simple exercise, you'll find out the attributes of the SparkContext in your PySpark shell which you'll be using for the rest of the course.

**Instructions**

- Print the version of SparkContext in the PySpark shell.
- Print the Python version of SparkContext in the PySpark shell.
- What is the master of SparkContext in the PySpark shell?

In [5]:
# Print the version of SparkContext
print("The version of Spark Context in the PySpark shell is", sc.version)

# Print the Python version of SparkContext
print("The Python version of Spark Context in the PySpark shell is", sc.pythonVer)

# Print the master of SparkContext
print("The master of Spark Context in the PySpark shell is", sc.master)

The version of Spark Context in the PySpark shell is 3.4.0
The Python version of Spark Context in the PySpark shell is 3.9
The master of Spark Context in the PySpark shell is local[*]


### Interactive Use of PySpark
Spark comes with an interactive Python shell in which PySpark is already installed in it. PySpark shell is useful for basic testing and debugging and it is quite powerful. The easiest way to demonstrate the power of PySpark’s shell is to start using it. In this example, you'll load a simple list containing numbers ranging from 1 to 100 in the PySpark shell.

The most important thing to understand here is that we are not creating any SparkContext object because PySpark automatically creates the SparkContext object named `sc`, by default in the PySpark shell.

**Instructions**

- Create a Python list named numb containing the numbers 1 to 100.
- Load the list into Spark using Spark Context's parallelize method and assign it to a variable spark_data.

In [6]:
# Create a Python list of numbers from 1 to 100 
numb = range(0, 100)

# Load the list into PySpark  
spark_data = sc.parallelize(numb)

### Loading data in PySpark shell
In PySpark, we express our computation through operations on distributed collections that are automatically parallelized across the cluster. In the previous exercise, you have seen an example of loading a list as parallelized collections and in this exercise, you'll load the data from a local file in PySpark shell.

Remember you already have a SparkContext `sc` and `file_path` variable (which is the path to the `README.md` file) already available in your workspace.

**Instructions**

- Load a local text file README.md in PySpark shell.

In [7]:
# Load a local file into PySpark shell
file_path = 'README.md'
lines = sc.textFile(file_path)

## Review of functional programming in Python

1. Use of Lambda function in python - filter()

Understanding PySpark becomes a lot easier if we understand functional programming principles in Python. In this video, let's review some of the Python functions such as lambda, map and filter.

2. What are anonymous functions in Python?

Python supports the creation of anonymous functions. That is functions that are not bound to a name at runtime, using a construct called the lambda. lambda functions are very powerful, well integrated into Python, and are often used in conjunction with typical functional concepts like map and filter functions. Like def, the lambda creates a function to be called later in the program. However, it returns the function instead of assigning it to a name. This is why lambdas are known as anonymous functions. In practice, they are used as a way to inline a function definition, or to defer execution of a code. Lambda functions can be used

3. Lambda function syntax

whenever function objects are required. They can have any number of arguments but only one expression and the expression is evaluated and returned. The general syntax of lambda function is shown here. Here is an example of a lambda function. In this example, lambda x: x * 2, x is the argument and x * 2 is the expression that gets evaluated and returned. This function has no name. It returns a function object which is assigned to the identifier "double" here. Applying the lambda function to a number such as 3 returns 6 which is the double of the original number. Let's take a look at the differences between

4. Difference between def vs lambda functions

def and lambda. Here is the Python code to illustrate cube of a number showing the difference between normal Python function using def and anonymous function using lambda. As you can see, both def and lambda do exactly the same. The main difference is that the lambda definition does not include a return statement and it always contains an expression which is returned. Also note that we can put a lambda definition anywhere a function is expected, and we don't have to assign it to a variable at all, unlike normal Python function using def. We use lambda functions when we

5. Use of Lambda function in Python - map()

require a nameless function for a short period of time. Most of the times we use lambdas with built-in functions like map and filter. The map function is called with all the items in the list and a new list is returned which contains items returned by that function for each item. The general syntax of map function is shown here. It takes in a function and a list. Here is an example of map function with lambda to add the number 2 to all the items in a list. The result indicates that the number 2 is added to 1, 2, 3, 4 resulting in 3, 4, 5, 6. The filter

6. Use of Lambda function in python - filter()

function in Python takes in a function and a list as arguments. The function is called with all the items in the list and a new list is returned which contains items for which the function evaluates to True. Here is the general syntax of filter function in Python. Similar to map, it takes a function and a list as arguments. Here is an example use of filter with lambda to filter out only odd numbers from a list. As shown in the example, filtering the items list containing number 1, 2, 3, 4 resulted in 1 and 3 which are the only odd numbers for the input list. Lambda functions

7. Let's practice

are incredibly useful and before going deep into Pyspark, let's practice some lambda functions in PySpark shell.

### Use of lambda() with map()
The `map()` function in Python returns a list of the results after applying the given function to each item of a given iterable (list, tuple etc.). The general syntax of `map()` function is `map(fun, iter)`. We can also use lambda functions with `map()`. The general syntax of `map()` function with `lambda()` is `map(lambda <argument>:<expression>, iter)`. Refer to slide 5 of video 1.7 for general help of `map()` function with `lambda()`.

In this exercise, you'll be using lambda function inside the `map()` built-in function to square all numbers in the list.

**Instructions**


- Print `my_list` which is available in your environment.
- Square each item in my_list using `map()` and `lambda()`.
- Print the result of map function.

In [11]:
my_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [12]:
# Print my_list in the console
print("Input list is", my_list)

# Square all numbers in my_list
squared_list_lambda = list(map(lambda x: x**2 , my_list))

# Print the result of the map function
print("The squared numbers are", squared_list_lambda)

Input list is [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
The squared numbers are [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]


### Use of lambda() with filter()
Another function that is used extensively in Python is the `filter()` function. The `filter()` function in Python takes in a function and a list as arguments. The general syntax of the `filter()` function is `filter(function, list_of_input)`. Similar to the `map()`, `filter()` can be used with lambda function. The general syntax of the `filter()` function with `lambda()` is `filter(lambda <argument>:<expression>, list)`. Refer to slide 6 of video 1.7 for general help of the `filter()` function with `lambda()`.

In this exercise, you'll be using `lambda()` function inside the `filter()` built-in function to find all the numbers divisible by 10 in the list.

**Instructions**

- Print `my_list2`` which is available in your environment.
- Filter the numbers divisible by 10 from `my_list2` using `filter()` and `lambda()`.
- Print the numbers divisible by 10 from `my_list2`.

In [13]:
my_list2 = [10, 21, 31, 40, 51, 60, 72, 80, 93, 101]

In [14]:
# Print my_list2 in the console
print("Input list is:", my_list2)

# Filter numbers divisible by 10
filtered_list = list(filter(lambda x: (x%10 == 0), my_list2))

# Print the numbers divisible by 10
print("Numbers divisible by 10 are:", filtered_list)

Input list is: [10, 21, 31, 40, 51, 60, 72, 80, 93, 101]
Numbers divisible by 10 are: [10, 40, 60, 80]


___

# Chapter 2: Programming in PySpark RDD’s

## Abstracting Data with RDDs

1. Introduction to PySpark RDD

In the first chapter, you have learned about different components of Spark namely, Spark Core, Spark SQL, and Spark MLlib. In this chapter, we will start with RDDs which are Spark’s core abstraction for working with data.

2. What is RDD?

Let's get started. RDD stands for Resilient Distributed Datasets. It is simply a collection of data distributed across the cluster. RDD is the fundamental and backbone data type in PySpark. When Spark starts processing data, it divides the data into partitions and distributes the data across cluster nodes, with each node containing a slice of data. Now, let's take a

3. Decomposing RDDs

look at the different features of RDD. The name RDD captures 3 important properties. Resilient, which means the ability to withstand failures and recompute missing or damaged partitions. Distributed, which means spanning the jobs across multiple nodes in the cluster for efficient computation. Datasets, which is a collection of partitioned data e.g. Arrays, Tables, Tuples or other objects. There are three different

4. Creating RDDs. How to do it?

methods for creating RDDs. You have already seen two methods in the previous chapter even though you are not aware that you are creating RDDs. The simplest method to create RDDs is to take an existing collection of objects (eg. a list, an array or a set) and pass it to SparkContext’s parallelize method. A more common way to create RDDs is to load data from external datasets such as files stored in HDFS or objects in Amazon S3 buckets or from lines in a text file stored locally and pass it to SparkContext's textFile method. Finally, RDDs can also be created from existing RDDs which we will see in the next video. In the first method,

5. Parallelized collection (parallelizing)

RDDs are created from a list or a set using the SparkContext’s `parallelize()` method. Let's try and understand how RDDs are created using this method with a couple of examples. In the first example, an RDD named numRDD is created from a Python list containing numbers 1, 2, 3, and 4. In the second example, an RDD named helloRDD is created from the 'hello world' string. You can confirm the object created is RDD using Python's type method. Creating

6. From external datasets

RDDs from external datasets is by far the most common method in PySpark. In this method, RDDs are created using SparkContext’s textFile method. In this simple example, an RDD named fileRDD is created from the lines of a README-dot-md file stored locally on your computer. Similar to previous method, you can confirm the RDD using the type method. Data

7. Understanding Partitioning in PySpark

partitioning is an important concept in Spark and understanding how Spark deals with partitions allow one to control parallelism. A partition in Spark is the division of the large dataset with each part being stored in multiple locations across the cluster. By default Spark partitions the data at the time of creating RDD based on several factors such as available resources, external datasets etc, however, this behavior can be controlled by passing a second argument called `minPartitions` which defines the minimum number of partitions to be created for an RDD. In the first example, we create an RDD named numRDD from the list of 10 integers using SparkContext's parallelize method with 6 partitions. In the second example, we create another RDD named fileRDD using SparkContext's textFile method with 6 partitions. The number of partitions in an RDD can always be found by using the `getNumPartitions` method. In the next

8. Let's practice

video, you'll see the final method of creating RDDs, for now let's create some RDDs like you just learnt.

## RDDs from Parallelized collections
Resilient Distributed Dataset (RDD) is the basic abstraction in Spark. It is an immutable distributed collection of objects. Since RDD is a fundamental and backbone data type in Spark, it is important that you understand how to create it. In this exercise, you'll create your first RDD in PySpark from a collection of words.

Remember you already have a SparkContext `sc` available in your workspace.

**Instructions**

- Create an RDD named `RDD` from a list of words.
- Confirm the object created is RDD.

In [15]:
# Create an RDD from a list of words
RDD = sc.parallelize(["Spark", "is", "a", "framework", "for", "Big Data processing"])

# Print out the type of the created object
print("The type of RDD is", type(RDD))

The type of RDD is <class 'pyspark.rdd.RDD'>


## RDDs from External Datasets
PySpark can easily create RDDs from files that are stored in external storage devices such as HDFS (Hadoop Distributed File System), Amazon S3 buckets, etc. However, the most common method of creating RDD's is from files stored in your local file system. This method takes a file path and reads it as a collection of lines. In this exercise, you'll create an RDD from the file path (`file_path`) with the file name `README.md` which is already available in your workspace.

Remember you already have a SparkContext `sc` available in your workspace.

**Instructions**

- Print the `file_path` in the PySpark shell.
- Create an RDD named `fileRDD` from a `file_path`.
- Print the type of the `fileRDD` created.

In [16]:
# Print the file_path
print("The file_path is", file_path)

# Create a fileRDD from file_path
fileRDD = sc.textFile(file_path)

# Check the type of fileRDD
print("The file type of fileRDD is", type(fileRDD))

The file_path is README.md
The file type of fileRDD is <class 'pyspark.rdd.RDD'>


## Partitions in your data
SparkContext's `textFile()` method takes an optional second argument called `minPartitions` for specifying the minimum number of partitions. In this exercise, you'll create an RDD named `fileRDD_part` with 5 partitions and then compare that with `fileRDD` that you created in the previous exercise. Refer to the "Understanding Partition" slide in video 2.1 to know the methods for creating and getting the number of partitions in an RDD.

Remember, you already have a SparkContext `sc`, file_path and fileRDD available in your workspace.

**Instructions**

- Find the number of partitions that support `fileRDD` RDD.
- Create an RDD named `fileRDD_part` from the file path but create 5 partitions.
- Confirm the number of partitions in the new `fileRDD_part` RDD.

In [17]:
# Check the number of partitions in fileRDD
print("Number of partitions in fileRDD is", fileRDD.getNumPartitions())

# Create a fileRDD_part from file_path with 5 partitions
fileRDD_part = sc.textFile(file_path, minPartitions = 5)

# Check the number of partitions in fileRDD_part
print("Number of partitions in fileRDD_part is", fileRDD_part.getNumPartitions())

Number of partitions in fileRDD is 2
Number of partitions in fileRDD_part is 5


## Basic RDD Transformations and Actions

1. RDD operations in PySpark
In the last video, you have learned how to load your data into RDDs. In this video, you'll learn about the various operations that support RDDs in PySpark. RDDs

2. Overview of PySpark operations

in PySpark supports two different types of operations - Transformations and Actions. Transformations are operations on RDDs that return a new RDD and Actions are operations that perform some computation on the RDD. The most important

3. RDD Transformations

feature which helps RDDs in fault tolerance and optimizing resource use is the lazy evaluation. So what is lazy evaluation? Spark creates a graph from all the operations you perform on an RDD and execution of the graph starts only when an action is performed on RDD as shown in this figure. This is called lazy evaluation in Spark. The RDD transformations we will look in this video are map, filter, flatMap and union. The map

4. map() Transformation

transformation takes in a function and applies it to each element in the RDD. Say you have an input RDD with elements 1,2,3,4. The map transformation takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD. In this example, the square function is applied to each element of the RDD. Let's understand this with an example. We first create an RDD using SparkContext's parallelize method on a list containing elements 1,2,3,4. Next, we apply map transformation for squaring each element of the RDD. The

5. filter() Transformation

filter transformation takes in a function and returns an RDD that only has elements that pass the condition. Suppose we have an input RDD with numbers 1,2,3,4 and we want to select numbers greater than 2, we can apply the filter transformation. Here is an example of the filter transformation wherein we use the same RDD as before to apply the filter transformation to filter out the numbers that are greater than 2. flatMap

6. flatMap() Transformation

is similar to map transformation except it returns multiple values for each element in the source RDD. A simple usage of flatMap is splitting up an input string into words. Here, you have an input RDD with two elements - "hello world" and "how are you". Applying the split function of the flatMap transformation results in 5 elements in the resulting RDD - "hello", "world", "how", "are", "you". As you can see, even though the input RDD has 2 elements, the output RDD now contains 5 elements. In this example, we create an RDD from a list containing the words "hello world" and "how are you". Next, we apply flatmap along with split function on the RDD to split the input string into individual words.

7. union() Transformation

union Transformation returns the union of one RDD with another RDD. In this figure, we are filtering the inputRDD and creating two RDDs - errorsRDD and warningsRDD and next we are combining both the RDDs using union transformation. To illustrate this using PySpark code, let's first create an inputRDD from a local file using SparkContext's textFile method, next we will use two filter transformations to create two RDDs errorRDD and warningsRDD and finally using union transformation we will combine them both. So far you have seen how RDD Transformations but after applying Transformations at some point, you'll want to actually do something with your dataset. This is when Actions come into picture.

8. RDD Actions

Actions are the operations that are applied on RDDs to return a value after running a computation. The four basic actions that you'll learn in this lesson are collect, take, first and count. Collect

9. collect() and take() Actions

action returns complete list of elements from the RDD. Whereas take(N) print an 'N' number of elements from the RDD. Continuing the map transformation example, executing collect returns all elements i.e 1, 4, 9, 16 from the RDD_map RDD that you created earlier. Similarly here is an example of take(2) action that prints the first 2 elements i.e 1 and 4 from the RDD_map RDD. Sometimes you just want to print the first element of

10. first() and count() Actions

the RDD. first action returns the first element in an RDD. It is similar to take(1). Here is an example of first action which prints the first element i.e 1 from the RDD_map RDD. Finally, the count action is used to return the total number of rows/elements in the RDD. Here is an example of count action to count the number of elements in the RDD_flatmap RDD. The result here indicates that there are 5 elements in the RDD_flatmap RDD. It's time for you to practice

11. Let's practice RDD operations

RDD operations in PySpark shell now.