# Installing Required Packages


## Local System
If you want to run this Jupyter notebook on your local system, you only need to install *PySpark*. PySpark installs Spark under the hood. However, be aware that on your local system, you have only one node in your cluster, so you cannot leverage the full power of Spark. It is also recommended to install *FindSpark*, which helps locate Spark in the system.

**⚠** Another thing to note when running a Spark application on a single node is *closures* and *variable sharing*. Your application might work on your local system but not perform as expected in a multi-node cluster.



In [None]:
!pip install pyspark findspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=5b18d8b7740ad369b8f14d6c53a30f1c91d4c047208aa56358972aac456056dc
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: findspark, pyspark
Successfully installed findspark-2.0.1 pyspark-3.5.1


##Google Colab
If you want to run this Jupyter notebook on Google Colab, you need to execute the following cells.

The following code will install the necessary applications and packages for working with Spark. These packages are:
- JDK: Java Development Kit
- Spark 3.5.1 (Released February 2024)
- Findspark (used to locate Spark in the system)


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

## setting environmental variables

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [None]:
!ls

sample_data  spark-3.5.1-bin-hadoop3  spark-3.5.1-bin-hadoop3.tgz


# Verifying the Installation of PySpark


In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName('Expolring_spark').getOrCreate()

print('Spark version: ', spark.version)
spark.stop()

Spark version:  3.5.1


# Generating Synthetic Data for Our Analysis
To start, we will generate some synthetic data to work with. Let's assume we have a dataset that reflects customer behavior in an online shop. This dataset will contain one million records and will be saved in a text file, with each line representing one record. The fields in our dataset include:

- **User ID**: A unique identifier for each user.
- **Timestamp**: The date and time of the interaction.
- **Interaction Type**: The type of interaction the user had  with the shop, such as "*click*," "*view*," or "*purchase*."
- **Item ID**: A unique identifier for each item in the shop.
- **Item Category**: The category of the item, such as "*appliance*", "*electronics*," "*books*," "*clothing*," or "*food*."



In [None]:
import random
from tqdm import tqdm
from datetime import datetime, timedelta

random.seed(126)#for reproducibility
def generate_data(num_records):
  '''
  Generates syntetic data.
  Parameters:
  ------------
  num_records: the number of generated records

  smaples:
  --------
  user_id, timestamp, interaction, item_id, item_category
  9432, 2024-07-29T12:22:32.732595, view, 4619, books
  2097, 2024-07-23T04:04:25.732595, view, 3673, clothing
  '''
  start_date = datetime.now()
  for _ in range(num_records):
      user_id = random.randint(1, 10_000)
      timestamp = (start_date - timedelta(seconds=random.randint(0, 2_592_000))).isoformat()
      interaction = random.choice(["click", "view", "purchase"])
      item_id = random.randint(1, 5_000)
      item_category = random.choice(["appliance", "electronics", "books", "clothing", "food" ])
      yield ", ".join(map(str, [user_id, timestamp, interaction, item_id, item_category]))

def write_data_to_file(file_path, num_records, buffer_size=1000, overwrite_file=True):
  '''
  Generates syntetic data and Writes to a file.
  We are using a buffer to reduce the number of I/O operations in order to increase efficiency.

  Parampeters:
  ------------
  file_path: the output file path
  num_records: the number of generated records
  buffer_size: the size of buffer data that is used to keep records in memory before writing them to the output file
  overwrite_file: a flag that determine weather to overwrite the file or not.
  '''
  if overwrite_file:
    open(file_path, "w").close() #overwrite the contet of the file

  with open(file_path, "a") as f:
    buffer = []
    for line in tqdm(generate_data(num_records), total=num_records, desc="Writing data"):
        buffer.append(line + "\n")
        if len(buffer) >= buffer_size:
            f.writelines(buffer)
            buffer = []

    if buffer:  # Write remaining lines in buffer
        f.writelines(buffer)

In [None]:
# Write 1,000,000 records to data.txt
write_data_to_file("data.txt", 1_000_000)

Writing data: 100%|██████████| 1000000/1000000 [00:10<00:00, 93701.75it/s]


# RDD
RDD stands for **R**esilient **D**istributed **D**ataset and is the fundamental building block of Apache Spark. While higher-level abstractions like `DataFrames` and `Datasets` are built on top of RDDs, understanding RDDs is crucial for mastering Spark.

**What is an RDD?**

An RDD is a *collection* of elements *partitioned* across the nodes of a cluster that can be operated on in parallel. This parallelism allows Spark to handle large-scale data processing efficiently.

**Key Features of RDDs**
- **Resilient**: RDDs are fault-tolerant and can  automatically recover from node failures. This resilience is achieved through *Directed Acyclic Graph* (DAG), which tracks the series of transformations that resulted the RDD.
- **Distributed**: Data in an RDD is distributed across multiple nodes in the cluster, enabling parallel processing.
- **Dataset**: An RDD is essentially a distributed collection of objects.

**How RDDs Work**

At first, the *driver* (an application) should connect to the Spark cluster. When you put a collection of data into an *RDD*, Spark breaks it into smaller chunks known as *partitions* or slices. These partitions are distributed among nodes across the cluster. When the driver wants to perform a task on the data, Spark copies the task to each node. On each node, workers perform the task on their respective partitions of data. In this manner, Spark uses distributed power to parallelize computation. Obviously, there are many challenges that Spark handles under the hood, making the results appear seamless to the user.

When working with RDDs, Spark builds a Directed Acyclic Graph (DAG) of transformations. These transformations (such as `map`, `filter`, and `reduce`) define the computation logic but Spark does not execute them immediately. Spark only triggers the actual computation when an action (like `count`, `collect`, `take`, or `save`) is called by driver. This lazy evaluation optimizes the processing and enables fault tolerance.




## SparkContext
`SparkContext` is the entry point for connecting to Spark in order to work with RDDs. It is the main gateway through which your Spark application(which is called driver) interacts with the Spark cluster.

💡 **Key Points**
- A driver can have only one active SparkContext at a time.
- Before creating a new SparkContext, you must stop the previous one.


In [None]:
from pyspark import SparkContext

# Creating a spark context class for woriking with RDD
sc = SparkContext(master='local[*]', \
                  appName='Expolring Python Spark RDD and DataFrame basics')

#verify everything is working correctly
print('Spark version: ', sc.version)

print(f"\n{'key':^35} | {'value':^20}")
print('-'*35,'|', '-'*20)
for i in sc.getConf().getAll():
  print(f'{i[0]:<35} | {i[1]:<20}')

sc.stop()


Spark version:  3.5.1

                key                 |        value        
----------------------------------- | --------------------
spark.app.name                      | Expolring Python Spark RDD and DataFrame basics
spark.driver.extraJavaOptions       | -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.cal

💡 In newer versions of Spark, it's recommended to use `SparkSession`, which includes `SparkContext` as an attribute. This approach is **preferred** for applications that use DataFrames or SQL operations.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Expolring Python Spark RDD and DataFrame basics") \
    .master("local[*]") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

sc = spark.sparkContext
print(sc.version)

3.5.1


💡 `local` indicates that Spark will run locally on your machine. The asterisk [*] specifies that Spark should use all available CPU cores on the local machine for execution.

## Creating an RDD
There are two ways to create RDDs:
- **parallelizing** an existing collection in your driver program
- **External resources** such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

💡 RDD is **imutable**. After creation it can not be changed.

In [None]:
range_rdd = sc.parallelize(range(0, 1_000_000), 4)
print(f'number of partitions: {range_rdd.getNumPartitions()}')

number of partitions: 4


**Number of partitions(slices)**
- Data will be broken into partitions and distribute among nodes across cluster.
- Normally, Spark tries to set the number of partitions *automatically* based on your cluster. So you don't need to set in manually.
- Spark will run **one** task for **each partition**.
- Typically you want 2-4 partitions for each CPU in your cluster.


❓ Let's see the number of elements in our RDD which we have just created.

In [None]:
print(f'total number of elemetns = {range_rdd.count():,}')

total number of elemetns = 1,000,000


PySpark can create distributed datasets from any **storage source** supported by Hadoop:
 - local file system,
 - HDFS,
 - Cassandra,
 - HBase,
 - Amazon S3,
 - etc.


 Spark supports:
 - text files,
 - SequenceFiles (a binary file format used in Hadoop to store key-value pairs),
 - and any other Hadoop InputFormat.

How many elements are stored in each partition(slice)?

In [None]:
text_rdd=sc.textFile('data.txt')
print(f'number of partitions: {text_rdd.getNumPartitions()}')

number of partitions: 4


## RDD Operations
RDDs support two types of operations:
1.   **transformations**, which *create* a new dataset from an existing one. `map`, `filter`, `flatmap`, `union`, `intersection` are examples of transformations.

2.   **actions**, which *return* a value to the driver program after running a computation on the dataset. `collect`, `take`, `count`, `countdistinct` are examples of actions.




Let's do some basic operation and see how fast spark is.



**Take** and **Collect** are used to return the spark proccessing results to driver program.

let's see 5 records of our `range_rdd`

In [None]:
range_rdd.take(5)

[0, 1, 2, 3, 4]

**Sample**: ranodmly select a few elements from RDD.

In [None]:
fraction = 7 / range_rdd.count()
print(fraction)
range_rdd.sample(False, fraction, seed=126).collect()

7e-06


[362710, 419395, 619157, 677070, 931036]

**Map**: Create a new RDD that represent the squared value of each element
$$x \longrightarrow x^2 $$

In [None]:
squared_num_rdd = range_rdd.map(lambda x: x**2)
squared_num_rdd.take(5)

[0, 1, 4, 9, 16]

**Filter**: Create a new RDD of numbers whose squared values are odd.








In [None]:
odd_squared_num_rdd = squared_num_rdd.filter(lambda x: x % 2 != 0)
odd_squared_num_rdd.take(5)

[1, 9, 25, 49, 81]

**Reduce**: Compute the sum of the squared numbers. use **cascade** pipline: $$ \sum{x_i^2} $$

In [None]:
%%time
total = range_rdd.map(lambda x: x**2).reduce(lambda x, y: x+y)
print(f'sum of squared numbers = {total:,}')
print()

sum of squared numbers = 333,332,833,333,500,000

CPU times: user 17.3 ms, sys: 1.14 ms, total: 18.5 ms
Wall time: 1.42 s


**Count** count the number of prime number

In [None]:
%%time
def is_prime(n):
  if n < 2:
    return False
  for i in range(2, int(n**0.5) + 1):
    if n % i == 0:
      return False
  return True

count_prime = range_rdd.filter(is_prime).count()
print(f'number of prime numbers = {count_prime:,}')
print()

number of prime numbers = 78,498

CPU times: user 41.5 ms, sys: 5.15 ms, total: 46.7 ms
Wall time: 6.19 s


**Map**: Extract the number of digits in each number
$$ x ⟶ len(x)$$

In [None]:
pair_digit_num_rdd = range_rdd.map(lambda x: (len(str(x)), x))
pair_digit_num_rdd.take(5)

[(1, 0), (1, 1), (1, 2), (1, 3), (1, 4)]

**GroupByKey**: Groups the numbers based on the number of digits they contain and determines how many numbers belong to each group. For example, the number 9 belongs to group 1, which contains one-digit numbers.

💡 Before using `collect`, ensure that your data fits in memory. Do NOT `collect` the entire dataset across the cluster.

In [None]:
pair_digit_num_rdd.groupByKey().map(lambda x: (x[0], len(x[1]))).sortByKey().collect()

[(1, 10), (2, 90), (3, 900), (4, 9000), (5, 90000), (6, 900000)]

**CountByKey**:We can do the same task with CountByKey action. The output is a Dictionary.

In [None]:
pair_digit_num_rdd.countByKey()

defaultdict(int, {1: 10, 2: 90, 3: 900, 4: 9000, 5: 90000, 6: 900000})

**ReduceByKey**: There is yet another approach to get the same result

In [None]:
pair_digit_num_rdd.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x+y).sortByKey().collect()

[(1, 10), (2, 90), (3, 900), (4, 9000), (5, 90000), (6, 900000)]

**mapValues**: Compute the number of elements, minimum, maximum and average for each group.

In [None]:
pair_digit_num_rdd.groupByKey().mapValues(lambda x: ( len(x), min(x), max(x) , sum(x)/len(x))).sortByKey().collect()

[(1, (10, 0, 9, 4.5)),
 (2, (90, 10, 99, 54.5)),
 (3, (900, 100, 999, 549.5)),
 (4, (9000, 1000, 9999, 5499.5)),
 (5, (90000, 10000, 99999, 54999.5)),
 (6, (900000, 100000, 999999, 549999.5))]

**filter**: Identify how many *five-digit* numbers are divisible by *7*.

In [None]:
pair_digit_num_rdd.filter(lambda x: x[0] == 5 and x[1] % 7 == 0).count()

12857

**mapPartitions**: Count the number of elements in each partition (slice)

In [None]:
def f(iterator): yield sum(1 for _ in iterator)
count_per_partition = range_rdd.mapPartitions(f).collect()
print(f'Number of points per partition = {count_per_partition}')

Number of points per partition = [250000, 250000, 250000, 250000]


### List of RDD Transformations

- **map(func)**	Return a new distributed dataset formed by passing each element of the source through a function func.

- **filter(func)**	Return a new dataset formed by selecting those elements of the source on which func returns true.

- **distinct**([numPartitions]))	Return a new dataset that contains the distinct elements of the source dataset.

- **flatMap(func)**	Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

- **sample**(withReplacement, fraction, seed)	Sample a fraction of the data, with or without replacement, using a given random number generator seed.

- **union**(otherDataset)	Return a new dataset that contains the union of the elements in the source dataset and the argument.

- **intersection**(otherDataset)	Return a new RDD that contains the intersection of elements in the source dataset and the argument.

- **groupByKey**([numPartitions])	When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.

- **reduceByKey**(func, [numPartitions])	When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

- **aggregateByKey**(zeroValue)(seqOp, combOp, [numPartitions])	When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

- **sortByKey**([ascending], [numPartitions])	When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

- **join**(otherDataset, [numPartitions])	When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numPartitions])	When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.

- **mapPartitions(func)**	Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type `Iterator<T> => Iterator<U>` when running on an RDD of type T.

- **mapPartitionsWithIndex(func)**	Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type `(Int, Iterator<T>) => Iterator<U>` when running on an RDD of type T.

- **cartesian**(otherDataset)	When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

- **pipe**(command, [envVars])	Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.

- **coalesce**(numPartitions)	Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

- **repartition**(numPartitions)	Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

- **repartitionAndSortWithinPartitions**(partitioner)	Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

### List of RDD Actions

- **count()**	Return the number of elements in the dataset.

- **countByKey()**	Only available on RDDs of type (K, V).
Returns a hashmap of (K, Int) pairs with the count of each key.

- **collect()**	Return all the elements of the dataset as an array at the driver program.

- **first()**	Return the first element of the dataset (similar to take(1)).

- **take(n)**	Return an array with the first n elements of the dataset.

- **takeOrdered**(n, [ordering])	Return the first n elements of the RDD using either their natural order or a custom comparator.

- **takeSample**(withReplacement, num, [seed])	Return an array with a random sample of num elements of the dataset, with or without replacement. ⚠ Do not load the whole RDD to driver memory. Consider using `sample` transformation instead.

- **reduce**(func)	Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be *commutative* and *associative* so that it can be computed correctly in parallel.

- **saveAsTextFile**(path)	Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

- **foreach(func)**	Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.


## Performance Tips
In this section, we will discuss some performance issues and best practices in Spark.

Let's begin with how Spark saves all data in memory. As you may have heard, Spark stores data in memory to avoid expensive disk I/O operations. This is the key reason Spark outperforms Hadoop MapReduce and achieves its lightning speed.

Now, consider this scenario: Imagine we have a huge dataset, say 10 billion numbers, saved in an RDD. As we know, Spark distributes this data into smaller partitions across the nodes in the cluster. Right? Let's say we perform some *transformations* followed by *actions* on the original data, creating a new RDD. This new RDD may also fit into memory. However, if we continue in this manner, we will eventually reach a point where the new RDDs no longer fit in memory. What happens then?

The answer lies in how Spark handles data. First, we should remember that a new RDD is only generated when an action is called. Otherwise, no new RDD is created. The Directed Acyclic Graph (DAG) keeps track of the transformations applied to the original RDD to create a new RDD. Every time a new RDD is created, Spark tries to save it in memory by default. However, if the new RDD doesn’t fit in memory, Spark does not **cache** it. Instead, it will regenerate the RDD on the fly each time it is needed.

Generally, Spark uses three storage levels:

- **MEMORY_ONLY**: If the RDD does not fit in memory, some partitions will not be cached and will be *recomputed* on the fly each time they are needed. This is the default storage level.

- **MEMORY_AND_DISK**: If the RDD does not fit in memory, the partitions that don't fit will be *stored on disk* and read from there when needed.

- **DISK_ONLY**: The RDD partitions are stored only on disk.

**Catch**

Suppose you have completed a sequence of time-consuming transformations, and you want to prevent Spark from repeating these operations each time. In this case, you may want to save the results in memory for future use. Two methods come in handy: `cache()` and `persist()`.

**unpersist**

On the other hand, if you want to identify RDDs that are no longer important, and you want Spark to consider removing these from memory when necessary, `unpersist()` is your friend.


In [None]:
from time import perf_counter

def is_prime(n):
  if n < 2:
    return False
  for i in range(2, int(n**0.5) + 1):
    if n % i == 0:
      return False
  return True

# Create an RDD
range_rdd = sc.parallelize(range(1, 1_000_000), 4)

# Perform a task without caching
prime_numbers = range_rdd.filter(is_prime)
t1 = perf_counter()
result_no_cache = (prime_numbers.count(),  prime_numbers.sum())
t2 = perf_counter()
print(f"Result without caching: {result_no_cache}")

# Perform a task with caching
prime_numbers2 = range_rdd.filter(is_prime).cache()
t3 = perf_counter()
result_cache = (prime_numbers2.count(), prime_numbers2.sum())
t4 = perf_counter()
print(f"Result with caching: {result_cache}")

print()
print(f"Time without caching: {t2 - t1:.4f} seconds")
print(f"Time with caching: {t4 - t3:.4f} seconds")
print()
print('storage level(without caching): ', prime_numbers.getStorageLevel())
print('storage level(with caching): ', prime_numbers2.getStorageLevel())

Result without caching: (78498, 37550402023)
Result with caching: (78498, 37550402023)

Time without caching: 13.9924 seconds
Time with caching: 7.1529 seconds

storage level(without caching):  Serialized 1x Replicated
storage level(with caching):  Memory Serialized 1x Replicated


## Unpersisting Data
Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.`unpersist()` method. Note that this method does not block by default. To block until resources are freed, specify blocking=true when calling this method.

In [None]:
range_rdd.cache()
print('storage level(cache): ', range_rdd.getStorageLevel())
range_rdd.unpersist(blocking=True)
print('storage level(unpersist): ', range_rdd.getStorageLevel())

In [None]:
range_rdd.take(5)

##Accumulators
💡 Supporting general, read-write **shared** variables across tasks would be inefficient.

 However, Spark does provide two limited types of shared variables for two common usage patterns:

 - broadcast variables
 - accumulators.

Accumulators in Spark are used specifically to provide a mechanism for safely updating a **shared** variable when execution is split up across worker nodes in a cluster.

In [None]:
accum = sc.accumulator(0)

def count_even(x):
  global accum
  if x%2 == 0:
    accum +=1

sc.parallelize(range(0,100_000)).foreach(count_even)
print(f'accumulated value = {accum.value:,}')


## Examples

### Example 1: Word Counter
**Task 1**: Count the occurrences of each word in a text document and sort the results based on frequency.


In [None]:
text_rdd=sc.textFile('/data/text.txt')
count_text_rdd=text_rdd.flatMap(lambda x: x.lower().split(' '))\
        .map(lambda x: (x, 1))\
        .reduceByKey(lambda x, y: x+y)\
        .sortBy(lambda x: x[1], ascending=False)
count_text_rdd.take(10)

**Task 2**: Group words based on the number of characters in each word, and identify the number of *unique* words **in** each group.


In [None]:
count_text_rdd.map(lambda x: (len(x[0]), 1))\
        .reduceByKey(lambda x, y: x+y)\
        .sortBy(lambda x: x[0] , ascending=False)\
        .take(5)

### Example 2: Computing $\pi$


In [None]:
import random

def sample(_):
  x, y = random.random(), random.random()
  return 1 if x*x + y*y <= 1 else 0

N= 1_000_000
count = sc.parallelize(range(0, N)).map(sample)\
             .sum()
print(f"Pi is roughly {4.0 * count / N}")

## Stop Spark
At the end, you should close the connetion to Spark by calling `stop` method.

In [None]:
sc.stop()

# References
1. [RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations) on the Spark website.
2. [Apache Spark / PySpark Tutorial: Basics In 15 Minutes](https://www.youtube.com/watch?v=QLQsW8VbTN4) - A concise and informative YouTube video by Greg Hogg that describes the basics of working with RDD in PySpark.
3. [PySpark Tutorial: Spark SQL & DataFrame Basics](https://www.youtube.com/watch?v=3-pnWVWyH-s) - Another great video by Greg Hogg, focusing on using PySpark DataFrames.
4. [Introduction to Google Colab and PySpark](https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/#big-data-pyspark-and-colaboratory) - Learn how to set up Google Colab for using PySpark, along with a thorough review of PySpark's capabilities.
5. [Spark with Python](https://github.com/tirthajyoti/Spark-with-Python/tree/master) - A GitHub repository that includes several Jupyter notebooks with examples of how to use PySpark.
6. [Apache Spark™ examples](https://spark.apache.org/examples.html)
7. [Apache Spark GitHub python examples](https://github.com/apache/spark/tree/master/examples/src/main/python)
