# RDD

## Closures

[RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-) may behave differently depending on whether execution is happening within the same JVM.

In [13]:
data = [1,2,3,4,5]
counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

Counter value:  0


For cluster modes, the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the foreach function, it’s no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! So use [*Accumulator*](https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators) instead.

Another common idiom is attempting to **print out** the elements of an RDD. However, in cluster mode, the output to **stdout** being called by the executors is now writing to the executor’s **stdout** instead, not the one on the driver, so **stdout** on the driver won’t show these! To print all elements on the driver, one can use the ```collect()``` method to first bring the RDD to the driver node. This can cause the driver to run out of memory, though, if you only need to print a few elements of the RDD, a safer approach is to use the ```take(): rdd.take(100)``` or ```sample()```

## Run on Yarn

<img src="images/spark_cluster.png" style="width: 600px;"/>

There are two deploy modes that can be used to launch Spark applications on YARN. In *cluster mode*, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In *client mode*, the driver runs in the client process, and the application master is only used for requesting resources from YARN.

Unlike other cluster managers supported by Spark in which the master’s address is specified in the ```--master``` parameter, in YARN mode the **ResourceManager**’s address is picked up from the Hadoop configuration. Thus, the ```--master``` parameter is yarn.

### Debugging
In YARN terminology, executors and application masters run inside “containers”. YARN has two modes for handling container logs after an application has completed.

- HDFS Shell / API
- Spark Web UI under the Executors Tab

### Web Interfaces
Every SparkContext launches a web UI, by default on port 4040, that displays useful information about the application. This includes:

- A list of scheduler stages and tasks
- A summary of RDD sizes and memory usage
- Environmental information.
- Information about the running executors

### Yarn

<img src="images/yarn_architecture.gif" style= "width: 600px"/>

The fundamental idea of [YARN](https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html) is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons. The idea is to have a global **ResourceManager (RM)** and per-application **ApplicationMaster (AM)**.

YARN supports the notion of **resource reservation** via the [ReservationSystem](https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ReservationSystem.html), a component that allows users to specify a profile of resources over-time and temporal constraints (e.g., deadlines), and reserve resources to ensure the predictable execution of important jobs.The ReservationSystem tracks resources over-time, performs admission control for reservations, and dynamically instruct the underlying scheduler to ensure that the reservation is fullfilled.

#### ResourceManager
The **Scheduler** is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is pure scheduler in the sense that it performs no monitoring or tracking of status for the application. Also, it offers no guarantees about restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based on the resource requirements of the applications; it does so based on the abstract notion of a resource **Container** which incorporates elements such as **memory, cpu, disk, network etc**.

The **Scheduler** has a pluggable policy which is responsible for partitioning the cluster resources among the various queues, applications etc. The current schedulers such as the CapacityScheduler and the FairScheduler would be some examples of plug-ins.

#### ApplicationsManager
The **ApplicationsManager** is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure. The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.

## Transformations

Such as: ```map(), filter(), groupBykey(), distinct(), sample()```

## Actions

Such as: ```reduce(), collect(), take(), foreach(), count()```

## Persistence

```cache()``` vs ```persist()```:
- You can mark an RDD to be persisted using the ```persist()``` or ```cache()``` methods on it.
- each persisted RDD can be stored using a different storage level
- The ```cache()``` method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory).

# Config Native BLAS/LAPACK for Spark

Basically refer to [1. Use Native BLAS/LAPACK in Apache Spark](html/Use%20Native%20BLAS_LAPACK%20in%20Apache%20Spark.htm) and [2. SPARK ML offical introduce doc](https://spark.apache.org/docs/latest/ml-guide.html#dependencies)

In above [1] 2.2 Step 2 more detail, we need to create a maven project and add some content to `pom.xml`. And Step 3 two params do need include the *jar* directory

```xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.mycompany.app</groupId>
  <artifactId>my-app</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>my-app</name>
  <url>http://maven.apache.org</url>
  <build>
    <plugins>
      <!-- any other plugins -->
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>com.github.fommil.netlib</groupId>
      <artifactId>all</artifactId>
      <version>1.1.2</version>
      <type>pom</type>
    </dependency>
  </dependencies>
</project>
```