## Phase 4.35

# Pyspark

## Objectives
- Get an introduction to <a href='#big_data'>Big Data</a>.
    - Discuss <a href='#parallel'>Parallel and Distributed Computing</a>.
    - Look at the <a href='#mapreduce'>MapReduce</a> process.
- Get started with <a href='#start'>Docker and PySpark</a>.
    - Discuss <a href='#rdd'>RDDs and DataFrames</a>.
- Code through an example in PySpark in <a href='#colab'>Google Colab</a>.

<a id='big_data'></a>

# Intro to Big Data
## What is *Big Data*
>***Big data is high-volume, high-velocity and/or high-variety information assets that demand cost-effective, innovative forms of information processing that enable enhanced insight, decision making, and process automation***.
> *https://www.gartner.com/en/information-technology/glossary/big-data*

<img src='./images/3_components.png' width=300>

### Velocity
<img src='./images/velo.jpeg' width=500>

### Variety
<img src='./images/variety.png' width=300>

## Big Data Analytics
The key activities associated with big data analytics are reflected in four main areas: 

- Warehousing and distribution.
- Storage.
- Computational platforms.
- Analyses, visualization, and evaluation.

<img src='./images/tech_stack.png'>

*Such a framework can be applied for knowledge discovery and informed decision-making in big data-driven organizations.*

<a id='parallel'></a>

## Parallel & Distributed Computing
- **MapReduce** is a programming paradigm that enables the ability to scale across hundreds or thousands of servers for big data analytics. 


> The term "MapReduce" refers to two distinct tasks. 
> 
> - The first is the **Map** job, which takes one set of data and transforms it into another set of data, where individual elements are broken down into tuples **(key/value pairs)**, 
>
> - The **Reduce** job takes the output from a map as input and combines those data tuples into a smaller set of tuples.


- The MapReduce programming paradigm is designed to allow **parallel and distributed processing**  of large sets of data (also known as big data). MapReduce allows us to convert such big datasets into sets of **tuples** as **key: value** pairs,


- Somehow, all data can be mapped to **key: value** pairs.
- Keys and values themselves can be of ANY data type.


> In simpler terms, **MapReduce uses parallel distributed computing to turn big data into regular data.**

### Distributed Processing

- *A distributed processing system is a group of computers in a network working in tandem to accomplish a task.*
<img src='./images/types_of_network.png'>

### Parallel Processing Systems

With parallel computing:
1. A larger problem is broken up into smaller pieces.
2. Every part of the problem follows a series of instructions.
3. Each one of the instructions is executed simultaneously on different processors.
4. All of the answers are collected from the small problems and combined into one final answer.

---

In the image below, you can see a simple example of a process being broken up and completed both sequentially and in parallel.

<img src='./images/parallel.png'>

<a id='mapreduce'></a>
### MapReduce  Example
Here are the first five zoos the data scientist reads over in the data document they receive:

| Animals              |
|----------------------|
| lion tiger bear      |
| lion giraffe         |
| giraffe penguin      |
| penguin lion giraffe |
| koala giraffe        |


Let's now look at how you would use the MapReduce framework in this simple word count example that could be generalized to much more data.

<img src="./images/word_count.png">

#### 1. MAP Task (Splitting & Mapping)
- Data transformed into **key:value** pairs and split into fragments, which are then assigned to map tasks. 
    - Each computing cluster is assigned a number of map tasks, which are subsequently distributed among its nodes.

- We will then use the map function to create key:value pairs represented by:   
> `{animal}: {# of animals per zoo}`

- After processing of the original key:value pairs, some __intermediate__ key:value pairs are generated. 
    - The intermediate key:value pairs are __sorted by their key values__ to create a new list of key:value pairs.
    

#### 2. Shuffling
- This list from the map task is divided into a new set of fragments
    - That sorts and shuffles the mapped objects into an order or grouping that will make it easier to reduce them. 

- **The number of these new fragments will be the same as the number of the reduce tasks**. 

### 3. REDUCE Task (Reducing)

- Now, every properly shuffled segment will have a reduce task applied to it. 

    - After the task is completed, the final output is written onto a file system. 
    - The underlying file system is usually HDFS (Hadoop Distributed File System). 
    
    
- It's important to note that MapReduce will generally only be powerful when dealing with large amounts of data. 
    - When working with a small dataset, it will be faster not to perform operations in the MapReduce framework.


- There are two groups of entities in this process to ensuring that the MapReduce task gets done properly:

    1. **Job Tracker**: a "master" node that informs the other nodes which map and reduce jobs to complete.

    2. **Task Tracker**: the "worker" nodes that complete the map and reduce operations.

There are different names for these components depending on the technology used, but there will always be a master node that informs worker nodes what tasks to perform.

# Intro to Spark
<a id='start'></a>
## Getting Started with PySpark and Docker
### Docker installation directions below:

1. **Install Docker Desktop. (or Docker Toolbox if you have Windows 10 Home).**


2. **Pull the pyspark-notebook image (this can take up to 20 minutes!)**

```bash
docker pull jupyter/pyspark-notebook
```

3. **I recommend creating a new folder for all of the pyspark-related labs. (I called mine Docker).**
    - Navigate to this folder containing the cloned repositories (NOT the folder for an individual repo).
    - Whatever folder you are in when you run this command will show up inside jupyter.


4A. **Start the container with port forwarding**
```bash
docker run -it --name my-pyspark1 -p 8888:8888 -v "${PWD}:/home/jovyan/work" jupyter/pyspark-notebook 
```

4B. **NOTE: If you have an issue with the ports clashing with your local jupyter notebook server:**
>- Change the port numbers from 8888 to something elise (e.g. 8989)
>- Add the `jupyter notebook` launch command to the end with `--no-browser --ip=0.0.0.0 --port=8989` (changing 899 to whatever port number you used).

- The full command would be:
```bash
docker run -it --name my-pyspark1 -p 8989:8989 -v "${PWD}:/home/jovyan/work" jupyter/pyspark-notebook jupyter notebook --no-browser --ip=0.0.0.0 --port=8989
```

5. **Copy and paste the url thats starts with 127.0.0.1 displayed in the terminal into your web browser.**
    - NOTE to Windows users using Docker Toolbox:
        - change the ip address of the url displayed from `127.0.0.1` to 

6. To stop the container, in you terminal hit Control+C

7. **To resume the container:**  (note: any pip installs or settings will be saved if you resume a stopped container)

```bash
docker start -ia my-pyspark1
```

8. To remove it:

```bash
docker rm my-pyspark1
```

<a id='rdd'></a>
## RDD & DataFrames

Resilient Distributed Datasets

<img src='./images/DataFrames.png'>

> *(From DataBricks)*
>
> *https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html*
>
> ### Use RDDs when:
> - You want low-level transformation and actions and control on your dataset;
> - Your data is unstructured, such as media streams or streams of text;
> - You want to manipulate your data with functional programming constructs than domain specific expressions;
> - You don’t care about imposing a schema, such as columnar format, while processing or accessing data attributes by name or column; and
> - You can forgo some optimization and performance benefits available with DataFrames and Datasets for structured and semi-structured data.
>
>
> ### Using Datasets / DataFrames:
> - You want rich semantics, high-level abstractions, and domain specific APIs, use DataFrame or Dataset.
> - Your processing demands high-level expressions, filters, maps, aggregation, averages, sum, SQL queries, columnar access and use of lambda functions on semi-structured data, use DataFrame or Dataset.
> - You want higher degree of type-safety at compile time, want typed JVM objects, take advantage of Catalyst optimization, and benefit from Tungsten’s efficient code generation, use Dataset.
> - You want unification and simplification of APIs across Spark Libraries, use DataFrame or Dataset.
> - You are a R user, use DataFrames.
> - You are a Python user, use DataFrames and resort back to RDDs if you need more control.

# Additional Resources

- <a href='https://medium.datadriveninvestor.com/distributed-data-processing-with-apache-spark-2a5e473b0cb1'>Medium - Distributed Data Processing with Apache Spark</a>
    - A very thorough talk through Spark. Very informative.
    
    
- <a href='https://github.com/apache/spark/blob/v3.1.1-rc3/examples/src/main/python/ml/logistic_regression_with_elastic_net.py'>Official Example - Logistic Regression with PySpark ML Library</a> 
    - From the Apache github. 
    - Uses `pyspark.ml.classification.LogisticRegression` 
    
    
- <a href='http://spark.apache.org/docs/latest/api/python/index.html'>PySpark Documentation</a>
    - Official Documentation

<a id='colab'></a>
# *Colab Notebook*
***Demo Notebook - Google Colab***
> *https://colab.research.google.com/drive/1EO_qvNC6BAx31RaAMlPjYI6Ftj1yjIFH?usp=sharing*