

---



---



> # **Spark Core**


***Big data analysis course***

Presenters :
*   Marziyeh Amiri
*   Zahra Ebadi
*   Elnaz Nasiri

Guide Master : Dr Farsad Zamani

Islamic Azad University،Science And Research Branch

Fall 1402


---



---



# Actions on RDD of key-value Pairs :
RDDs of key-value pairs support a few additional actions, which are briefly described next.

## 1. countByKey :
The countByKey method counts the occurrences of each unique key in the source RDD.
It returns a Map of key-count pairs.

**Scala :**



```
val pairRdd = sc.parallelize(List(("a", 1), ("b", 2), ("c", 3), ("a", 11), ("b", 22), ("a", 1)))
val countOfEachKey = pairRdd.countByKey
```

**Python :**

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=51bb0dfb9bdf67c42d5654a891441139dff385251ab9a303daa694ae028b9070
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark import SparkContext

# Create a SparkContext
sc = SparkContext("local", "example")

In [3]:
# Create an RDD
rdd = sc.parallelize([("a", 1), ("b", 2), ("c", 3), ("a", 11), ("b", 22), ("a", 1)])

# Count occurrences of each key
result = sorted(rdd.countByKey().items())

# Print the result
print(f"\033[1m",result)

[1m [('a', 3), ('b', 2), ('c', 1)]


---
## 2. lookup :
The lookup method takes a key as input and returns a sequence of all the values mapped to that key in the
source RDD.

**Scala :**

```
val pairRdd = sc.parallelize(List(("a", 1), ("b", 2), ("c", 3), ("a", 11), ("b", 22), ("a", 1)))
val values = pairRdd.lookup("a")
```
**Python :**

In [4]:
# Create a pair RDD
pair_rdd = sc.parallelize([("a", 1), ("b", 2), ("c", 3), ("a", 11), ("b", 22), ("a", 1)])

# Use filter to get values for key "a"
values = pair_rdd.filter(lambda x: x[0] == "a").values().collect()

# Print the result
print(f"\033[1m",values)

[1m [1, 11, 1]


---
# Actions on RDD of Numeric Types :
RDDs containing data elements of type Integer, Long, Float, or Double support a few additional actions that
are useful for statistical analysis. The commonly used actions from this group are briefly described next.


## 1. mean :
The mean method returns the average of the elements in the source RDD.

**Scala :**

```
val numbersRdd = sc.parallelize(List(2, 5, 3, 1))
val mean = numbersRdd.mean
```
**Python :**

In [5]:
# Create an RDD of numbers
numbers_rdd = sc.parallelize([2, 5, 3, 1])

# Calculate the mean
mean = numbers_rdd.mean()

# Print the result
print(f"\033[1m",mean)

[1m 2.75


---
## 2. stdev :
The stdev method returns the standard deviation of the elements in the source RDD.

**Scala :**

```
val numbersRdd = sc.parallelize(List(2, 5, 3, 1))
val stdev = numbersRdd.stdev
```

**Python :**

In [6]:
# Create an RDD of numbers
numbers_rdd = sc.parallelize([2, 5, 3, 1])

# Calculate the standard deviation
stdev = numbers_rdd.stdev()

# Print the result
print(f"\033[1m",stdev)

[1m 1.479019945774904


---
## 3. sum :
The sum method returns the sum of the elements in the source RDD.

**Scala :**

```
val numbersRdd = sc.parallelize(List(2, 5, 3, 1))
val sum = numbersRdd.sum
```
**Python :**

In [7]:
# Create an RDD of numbers
numbers_rdd = sc.parallelize([2, 5, 3, 1])

# Calculate the sum
total_sum = numbers_rdd.sum()

# Print the result
print(f"\033[1m",total_sum)

[1m 11


---
## 4. variance :
The variance method returns the variance of the elements in the source RDD.

**Scala :**
```
val numbersRdd = sc.parallelize(List(2, 5, 3, 1))
val variance = numbersRdd.variance
```
**Python :**

In [8]:
# Create an RDD of numbers
numbers_rdd = sc.parallelize([2, 5, 3, 1])

# Calculate the variance
variance = numbers_rdd.variance()

# Print the result
print(f"\033[1m",variance)

[1m 2.1875


---
# Saving an RDD :
Generally, after data is processed, results are saved on disk. Spark allows an application developer to save an RDD to any hadoop supported storage system.
This section presents commonly used RDD methods to save an RDD to a file

## 1. saveAsTextFile :
The saveAsTextFile method saves the elements of the source RDD in the specified directory on any
Hadoop-supported file system. Each RDD element is converted to its string representation and stored as a
line of text.

**Scala :**

```
val numbersRdd = sc.parallelize((1 to 10000).toList)
val filteredRdd = numbersRdd filter { x => x % 1000 == 0}
filteredRdd.saveAsTextFile("numbers-as-text")
```
**Python :**

In [9]:
# Create an RDD of numbers from 1 to 10000
numbers_rdd = sc.parallelize(list(range(1, 10001)))

# Filter the RDD to keep only numbers divisible by 1000
filtered_rdd = numbers_rdd.filter(lambda x: x % 1000 == 0)

# Save the filtered RDD as a text file
filtered_rdd.saveAsTextFile("numbers-as-text")

---
## 2. saveAsObjectFile :
The saveAsObjectFile method saves the elements of the source RDD as serialized Java objects in the
specified directory.

**Scala :**

```
val numbersRdd = sc.parallelize((1 to 10000).toList)
val filteredRdd = numbersRdd filter { x => x % 1000 == 0}
filteredRdd.saveAsObjectFile("numbers-as-object")
```

**Python :**

In [10]:
# Create an RDD of numbers from 1 to 10000
numbers_rdd = sc.parallelize(list(range(1, 10001)))

# Filter the RDD to keep only numbers divisible by 1000
filtered_rdd = numbers_rdd.filter(lambda x: x % 1000 == 0)

# Save the filtered RDD as an object file
filtered_rdd.saveAsPickleFile("numbers-as-object")

---
## 3. saveAsSequenceFile :
The saveAsSequenceFile method saves an RDD of key-value pairs in SequenceFile format. An RDD of key_value pairs can also be saved in text format using the saveAsTextFile.

**Scala :**



```
val pairs = (1 to 10000).toList map {x => (x, x*2)}
val pairsRdd = sc.parallelize(pairs)
val filteredPairsRdd = pairsRdd filter { case (x, y) => x % 1000 ==0 }
filteredPairsRdd.saveAsSequenceFile("pairs-as-sequence")
filteredPairsRdd.saveAsTextFile("pairs-as-text")
```



**Python :**

In [11]:
# Create pairs as a list
pairs = [(x, x*2) for x in range(1, 10001)]

# Create an RDD of pairs
pairs_rdd = sc.parallelize(pairs)

# Filter the RDD to keep only pairs where the first element is divisible by 1000
filtered_pairs_rdd = pairs_rdd.filter(lambda pair: pair[0] % 1000 == 0)

# Save the filtered RDD as a SequenceFile
filtered_pairs_rdd.saveAsSequenceFile("pairs-as-sequence")

# Save the filtered RDD as a text file
filtered_pairs_rdd.saveAsTextFile("pairs-as-text")



---


# Lazy Operations :
RDD creation and transformation methods are lazy operations. Spark does not immediately perform any
computation when an application calls a method that return an RDD. For example, when you read a file
from HDFS using textFile method of SparkContext, Spark does not immediately read the file from disk.
Similarly, RDD transformations, which return a new RDD, are lazily computed. Spark just keeps track of
transformations applied to an RDD.

Let’s consider the following sample code :

**Scala :**



```
val lines = sc.textFile("...")
val errorLines = lines filter { l => l.contains("ERROR")}
val warningLines = lines filter { l => l.contains("WARN")}
```


**Python :**

In [13]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [15]:
# Read the text file
lines = sc.textFile("drive/MyDrive/sample_text.txt")

# Filter lines containing "ERROR"
error_lines = lines.filter(lambda l: "ERROR" in l)

# Filter lines containing "WARN"
warning_lines = lines.filter(lambda l: "WARN" in l)

# Print the results
print("Lines containing 'ERROR':")
print(error_lines.collect())

print("\nLines containing 'WARN':")
print(warning_lines.collect())

Lines containing 'ERROR':

Lines containing 'WARN':



# Action Triggers Computation :
Action Triggers Computation RDD transformations are computed when an application calls an action method of an RDD or saves an RDD to a storage system. Saving an RDD to a storage system is considered as an action, even though it does not return a value to the driver program.
Spark's lazy evaluation enables efficient RDD computations. Transformations are deferred until an action is triggered, allowing Spark to optimize operations by pipelining and minimizing unnecessary data transfer. When an action is invoked, Spark recursively processes dependencies, starting from the root RDD, to generate the desired result for the driver program.










---


# Caching :
Besides storing data in memory, caching an RDD play another important role.
by default, every time an action method is called, Spark traverses the lineage tree of an RDD and computes all the transformations to obtain the RDD whose action method was called.

Consider the following example :

**Scala :**

```
val logs = sc.textFile("path/to/log-files")
val errorLogs = logs filter { l => l.contains("ERROR")}
val warningLogs = logs filter { l => l.contains("WARN")}
val errorCount = errorLogs.count
val warningCount = warningLogs.count
```

**Python :**

In [16]:
# Read the text file
logs = sc.textFile("drive/MyDrive/sample_text.txt")

# Filter logs containing "ERROR"
error_logs = logs.filter(lambda l: "ERROR" in l)

# Filter logs containing "WARN"
warning_logs = logs.filter(lambda l: "WARN" in l)

# Count the number of "ERROR" logs
error_count = error_logs.count()

# Count the number of "WARN" logs
warning_count = warning_logs.count()

# Print the results
print("Number of ERROR logs:", error_count)
print("Number of WARN logs:", warning_count)

Number of ERROR logs: 2
Number of WARN logs: 3



The code reads log files twice due to two calls to the count action method after a single textFile method call. This example illustrates potential inefficiencies, especially in real-world applications with numerous transformations. RDD caching in Spark computes transformations up to a point and creates a checkpoint, but the benefit starts with the second action due to lazy caching. Cached RDDs enhance subsequent actions, offering faster execution, particularly beneficial for iterating over data multiple times. Spark stores RDDs in executor memory on worker nodes when cached in-memory, optimizing performance.



---


# RDD Caching Methods :
The RDD class provides two methods to cache an RDD: **cache** and **persist.**


## 1. cache :
The cache method stores an RDD in the memory of the executors across a cluster. It essentially materializes
an RDD in memory.
The example shown earlier can be optimized using the cache method as shown next :

**Scala :**


```
val logs = sc.textFile("path/to/log-files")
val errorsAndWarnings = logs filter { l => l.contains("ERROR") || l.contains("WARN")}
errorsAndWarnings.cache()
val errorLogs = errorsAndWarnings filter { l => l.contains("ERROR")}
val warningLogs = errorsAndWarnings filter { l => l.contains("WARN")}
val errorCount = errorLogs.count
val warningCount = warningLogs.count
```

**Python :**

In [17]:
# Read the text file
logs = sc.textFile("drive/MyDrive/sample_text.txt")

# Filter logs containing "ERROR" or "WARN"
errors_and_warnings = logs.filter(lambda l: "ERROR" in l or "WARN" in l)

# Cache the RDD for better performance if reused
errors_and_warnings.cache()

# Filter logs containing only "ERROR"
error_logs = errors_and_warnings.filter(lambda l: "ERROR" in l)

# Filter logs containing only "WARN"
warning_logs = errors_and_warnings.filter(lambda l: "WARN" in l)

# Count the number of "ERROR" logs
error_count = error_logs.count()

# Count the number of "WARN" logs
warning_count = warning_logs.count()

# Print the results
print("Number of ERROR logs:", error_count)
print("Number of WARN logs:", warning_count)

Number of ERROR logs: 2
Number of WARN logs: 3




---


## 2. persist :
The persist method is a generic version of the cache method. It allows an RDD to be stored in memory,
disk, or both. It optionally takes a storage level as an input parameter. If persist is called without any
parameter, its behavior is identical to that of the cache method.

**Scala :**

```
val lines = sc.textFile("...")
lines.persist()
```

The persist method supports the following common storage options:
*	 *MEMORY_ONLY* : When an application calls the persist method with the
MEMORY_ONLY
flag, Spark stores RDD partitions in memory on the worker nodes using deserialized Java objects. If an RDD partition does not fit in memory on a worker node, it is computed on the fly when needed.

*	 *DISK_ONLY* : If persist is called with the DISK_ONLY flag, Spark materializes RDD
partitions and stores them in a local file system on each worker node. This option
can be used to persist intermediate RDDs so that subsequent actions do not have to
start computation from the root RDD.
*	 *MEMORY_AND_DISK* : In this case, Spark stores as many RDD partitions in memory as
possible and stores the remaining partitions on disk.
*	 *MEMORY_ONLY_SER* : In this case, Spark stores RDD partitions in memory as serialized
Java objects. A serialized Java object consumes less memory, but is more CPU intensive to read. This option allows a trade-off between memory consumption and
CPU utilization.
*	 *MEMORY_AND_DISK_SER* : Spark stores in memory as serialized Java objects as many
RDD partitions as possible. The remaining partitions are saved on disk.

**Python :**

In [18]:
from pyspark.storagelevel import StorageLevel

# Read the text file
lines = sc.textFile("drive/MyDrive/sample_text.txt")

# Persist the RDD with MEMORY ONLY storage level
lines.persist(StorageLevel.MEMORY_ONLY)

# Persist the RDD with DISK ONLY storage level
#lines.persist(StorageLevel.DISK_ONLY)

# Persist the RDD with MEMORY AND DISK SER storage level
#lines.persist(StorageLevel.MEMORY_AND_DISK)

# Persist the RDD with MEMORY ONLY SER storage level
#lines.persist(StorageLevel.MEMORY_ONLY_2)

# Persist the RDD with MEMORY AND DISK SER storage level
#lines.persist(StorageLevel.MEMORY_AND_DISK_2)

# Action 1: Count the number of lines
line_count = lines.count()
print(f"Number of lines: {line_count}")

# Action 2: Display the first 10 lines
first_10_lines = lines.take(10)
print("First 10 lines:")
for line in first_10_lines:
    print(line)

Number of lines: 4
First 10 lines:
In the intricate landscape of software development, encountering an ERROR can be both perplexing and pivotal. 
As developers navigate through code, the vigilant presence of WARN messages becomes crucial, serving as beacons signaling potential pitfalls. 
These WARN messages act as guardians, cautioning developers about impending issues and providing insights to preemptively address vulnerabilities. 




---
### RDD Caching Is Fault Tolerant :
A Spark application will not crash if a node with cached RDD partitions fails. Spark automatically recreates and caches the partitions stored on the failed node on another node. Spark uses RDD lineag information to recompute lost cached partitions.

### Cache Memory Management :
Spark automatically manages cache memory using LRU (least recently used) algorithm. It removes old
RDD partitions from cache memory when needed. In addition, the RDD API includes a method called
unpersist(). An application can call this method to manually remove RDD partitions from memory.



---


# Spark Jobs :
Spark applications rely on RDD operations, encompassing transformation, action, and caching methods. Jobs in Spark represent sets of computations initiated by calling action methods on RDDs, triggering computations from data reading to result retrieval. RDD caching optimizes job performance, allowing for the continuation of computations from cached points. Task stages in Spark are organized into a Directed Acyclic Graph (DAG), with shuffle boundaries determining stage grouping. Executed tasks run in parallel on nodes, prioritizing data locality for scheduling. In case of node failure, tasks are resiliently resubmitted to alternate nodes, ensuring robust task execution.



---


# Shared Variables :
Spark's shared-nothing architecture partitions data across nodes, with no global memory for tasks. Communication between the driver program and tasks occurs through message sharing. To optimize for efficiency, Spark sends variables to executors, but default behavior may lead to redundancy, prompting the introduction of shared variables for scenarios like updating global variables across tasks on different nodes.


## 1. Broadcast Variables :
Broadcast variables in Spark optimize data sharing by sending them to worker nodes only once, cached in deserialized form in executor memory as read-only variables. This approach is particularly beneficial for multi-stage jobs and tasks referencing the same driver variable, avoiding the performance impact of deserialization before each task. The SparkContext class's broadcast method creates broadcast variables, providing efficient access to shared data across tasks in a Spark application.

Consider an application where we want to generate transaction details from e-commerce transactions. In
a real-world application, there would be a master customer table, a master item table, and transactions table.

**Scala :**


```
case class Transaction(id: Long, custId: Int, itemId: Int)
case class TransactionDetail(id: Long, custName: String, itemName: String)
val customerMap = Map(1 -> "Tom", 2 -> "Harry")
val itemMap = Map(1 -> "Razor", 2 -> "Blade")
val transactions = sc.parallelize(List(Transaction(1, 1, 1), Transaction(2, 1, 2)))
val bcCustomerMap = sc.broadcast(customerMap)
val bcItemMap = sc.broadcast(itemMap)
val transactionDetails = transactions.map{t => TransactionDetail(
 t.id, bcCustomerMap.value(t.custId), bcItemMap.value(t.itemId))}
transactionDetails.collect
```

**Python :**

In [19]:
# Define the case classes
class Transaction:
    def __init__(self, id, custId, itemId):
        self.id = id
        self.custId = custId
        self.itemId = itemId

class TransactionDetail:
    def __init__(self, id, custName, itemName):
        self.id = id
        self.custName = custName
        self.itemName = itemName

# Define the data
customer_map = {1: "Tom", 2: "Harry"}
item_map = {1: "Razor", 2: "Blade"}
transactions_data = [(1, 1, 1), (2, 1, 2)]

# Create RDDs
transactions_rdd = sc.parallelize([Transaction(*t) for t in transactions_data])
bc_customer_map = sc.broadcast(customer_map)
bc_item_map = sc.broadcast(item_map)

# Map the transactions to TransactionDetail
transaction_details_rdd = transactions_rdd.map(
    lambda t: TransactionDetail(t.id, bc_customer_map.value[t.custId], bc_item_map.value[t.itemId])
)

# Collect the results
result = transaction_details_rdd.collect()

# Print the result
for detail in result:
    print(f"TransactionDetail(id={detail.id}, custName={detail.custName}, itemName={detail.itemName})")


TransactionDetail(id=1, custName=Tom, itemName=Razor)
TransactionDetail(id=2, custName=Tom, itemName=Blade)



Broadcast variables in Spark facilitated an efficient join between customer, item, and transaction datasets by sending data to each node only once, avoiding the network shuffle associated with the join operator in the RDD API. This approach replaced an expensive join operation with a more economical map operation.



---


## 2. Accumulators :
Accumulators in Spark are add-only variables updated by tasks on different nodes and read by the driver program, suitable for implementing counters and aggregations. The SparkContext class facilitates accumulator creation with an optional name for display in the Spark UI, providing operators like add and += for task updates. The accumulator's value can only be read by the driver program using the value method.

Let’s consider an application that needs to filter out and count the number of invalid customer identifiers
in a customer table. In a real-world application, we will read the input data from disk and write the filtered
data back to another file disk.

**Scala :**


```
case class Customer(id: Long, name: String)
val customers = sc.parallelize(List(
 Customer(1, "Tom"),
 Customer(2, "Harry"),
 Customer(-1, "Paul")))
val badIds = sc.accumulator(0, "Bad id accumulator")
val validCustomers = customers.filter(c => if (c.id < 0) {
 badIds += 1
 false
 } else true
 )
val validCustomerIds = validCustomers.count
val invalidCustomerIds = badIds.value
```

**Python :**

In [20]:
# Define the case class
class Customer:
    def __init__(self, id, name):
        self.id = id
        self.name = name

# Define the data
customer_data = [(1, "Tom"), (2, "Harry"), (-1, "Paul")]

# Create RDD
customers_rdd = sc.parallelize([Customer(*c) for c in customer_data])

# Filter valid customers and update the accumulator for bad ids
def filter_and_count(c):
    if c.id < 0:
        return False
    return True

# Count the valid and invalid customer ids
valid_customers_rdd = customers_rdd.filter(filter_and_count)
valid_customer_ids = valid_customers_rdd.count()
invalid_customer_ids = customers_rdd.count() - valid_customer_ids

# Print the results
print(f"Valid Customer IDs: {valid_customer_ids}")
print(f"Invalid Customer IDs: {invalid_customer_ids}")

Valid Customer IDs: 2
Invalid Customer IDs: 1


In [21]:
sc.stop()


Caution is advised when using Accumulators in Spark, as updates within transformations may not be executed exactly once, especially if a task or stage is re-executed. Additionally, updates are deferred until an RDD action method is called, potentially leading to incorrect values if the driver program queries the accumulator before an action is invoked.



---
# Summary :


Spark is a high-performance, fault-tolerant, and user-friendly in-memory cluster computing framework, outpacing Hadoop MapReduce by up to 100 times. With an expressive API in Java, Python, Scala, and R, Spark enhances developer productivity, offering a unified platform for diverse big data applications and supporting interactive data analysis and iterative algorithms. The programming model revolves around RDDs, resembling Scala collections and enabling distributed data processing.

*THE END*

---



---

