# Module 9: Working with RDDs and Shared Variables in PySpark


In this notebook, we will cover three major topics from Module 9:

1. **Load and Save Data Using RDD**
2. **Analyzing Hadoop Data with RDD**
3. **Using Broadcast and Accumulator Variables**

Each section includes explanations, example code, and interpretation of results.


## 1. Loading and Saving Data Using RDDs


RDDs (Resilient Distributed Datasets) are Spark's original distributed collection abstraction.

We can create RDDs from external data (e.g., `.txt` files) or from existing Python collections. Below is an example that loads a dataset from a file and saves it back to disk.


In [2]:
from pyspark.sql import SparkSession
import shutil

spark = SparkSession.builder.appName("Module9-RDD").getOrCreate()
sc = spark.sparkContext

# Create RDD from a local list
data = ["William", "Pourmajidi", "Advanced Python", "Apache Spark"]
rdd = sc.parallelize(data)

# Delete existing directory
output_dir = "output_rdd"
if output_dir in sc._jvm.java.lang.System.getProperty("user.dir"):
    shutil.rmtree(output_dir)

# Save RDD to text file (this creates a folder with part-0000 files)
rdd.saveAsTextFile("output_rdd")

# Load RDD back from file
loaded_rdd = sc.textFile("output_rdd")
loaded_rdd.collect()


                                                                                

['Advanced Python', 'Pourmajidi', 'William', 'Apache Spark']


This demonstrates how RDDs can persist and be reloaded from storage, making them suitable for batch-processing pipelines.


## 2. Analyzing Simulated Hadoop Data Using RDD


We'll simulate Hadoop-style logs with synthetic text and show how RDDs can help in analysis tasks like counting and filtering.


In [3]:
# Simulated Hadoop logs
log_data = [
    "ERROR 2025-07-01 Connection refused",
    "INFO 2025-07-01 Service started",
    "ERROR 2025-07-02 Disk full",
    "WARN 2025-07-02 High memory usage",
    "INFO 2025-07-03 Job completed"
]
log_rdd = sc.parallelize(log_data)

# Define a function to extract log level and count
# The function returns a tuple of (log level, count)
def extract_log_level(line):
    log_level = line.split()[0]
    count = 1
    return (log_level, count)

# Define a function to reduce log level counts
def reduce_log_level_counts(a, b):
    return a + b

# Count log levels
counts = log_rdd.map(extract_log_level).reduceByKey(reduce_log_level_counts)
counts.collect()



                                                                                

[('WARN', 1), ('ERROR', 2), ('INFO', 2)]



**MapReduce Paradigm and Spark**
=====================================

### Overview of MapReduce

The MapReduce paradigm is a programming model used for processing large datasets in a distributed computing environment. It consists of three main steps:

#### Map

* Take a large dataset, break it down into smaller chunks, and apply a transformation to each chunk.
* This produces a new dataset with the transformed data.

#### Shuffle

* Rearrange the data to group similar keys together.

#### Reduce

* Apply a reduction function to each group of data, producing a smaller output dataset.

### Spark's Map and Reduce Operations

In Spark, the `map` and `reduce` operations are similar to the MapReduce paradigm, but with some differences:

#### Map

* In Spark, the `map` operation is performed by applying a transformation function to each element of an RDD (Resilient Distributed Dataset).

#### Reduce

* In Spark, the `reduce` operation is performed by applying a reduction function to each group of data, producing a smaller output dataset.





This shows how RDD transformations and actions can efficiently process large-scale log data typical of Hadoop environments.


## 3. Using Broadcast and Accumulator Variables


**Broadcast** variables are read-only and shared across nodes.

**Accumulators** are write-only variables used for counting operations during distributed execution.


In [6]:
# Create a list of words to be processed
words = ["spark", "python", "big", "data", "spark", "pyspark"]

# Broadcast a list of stopwords to all nodes in the cluster
# This is useful when we need to access the same data from multiple nodes
broadcast_stopwords = sc.broadcast(["big", "data"])

# Create an RDD from the list of words
# This will split the data into smaller chunks and distribute it across the cluster
word_rdd = sc.parallelize(words)

# Filter out the words that are in the broadcasted stopwords list
# The lambda function is applied to each word in the RDD
# The word is included in the filtered RDD if it is not in the stopwords list
def filter_out_stopwords(word):
    # Get the list of stopwords from the broadcasted object
    stopwords = broadcast_stopwords.value
    
    # Check if the word is not in the list of stopwords
    if word not in stopwords:
        # If the word is not a stopword, return True to include it in the filtered RDD
        return True
    else:
        # If the word is a stopword, return False to exclude it from the filtered RDD
        return False

# Filter out the words that are in the broadcasted stopwords list
filtered = word_rdd.filter(filter_out_stopwords)

# Collect the filtered RDD and print the results
# This will bring the data back to the driver node and print it to the console
filtered.collect()

['spark', 'python', 'spark', 'pyspark']

In [11]:
# Create an accumulator to keep track of a value across multiple nodes in the cluster
# Accumulators are useful when we need to aggregate data from multiple nodes
accum = sc.accumulator(0)

# Define a function to count the occurrences of the word "spark"
# This function will be applied to each word in the RDD
def count_spark(word):
    # Use the global keyword to access the accumulator variable
    # This is necessary because the accumulator is defined outside the function
    global accum
    
    # Check if the word is "spark"
    if word == "spark":
        # If the word is "spark", increment the accumulator by 1
        accum += 1

# Apply the count_spark function to each word in the RDD
# The foreach method applies a function to each element in the RDD, but does not return anything
word_rdd.foreach(count_spark)

# Get the final value of the accumulator
# This will give us the total count of "spark" across all nodes in the cluster
accum.value

2