# Resilient Distributed Datasets  

Resilient Distributed Datasets (RDDs) are a distributed collection of immutable JVM objects that allow you to perform calculations very quickly, and they are the backbone of Apache Spark.  

As the name suggests, the dataset is distributed; it is split into chunks based on some key and distributed to executor nodes. Doing so allows for running calculations against such datasets very quickly. Also, as already mentioned in Chapter 1, Understanding Spark, RDDs keep track (log) of all the transformations applied to each chunk to speed up the computations and provide a fallback if things go wrong and that portion of the data is lost; in such cases, RDDs can recompute the data. This data lineage is another line of defense against data loss, a complement to data replication.  



## Internal workings of an RDD  
RDDs operate in parallel. This is the strongest advantage of working in Spark: Each transformation is executed in parallel for enormous increase in speed.  
The transformations to the dataset are lazy. This means that any transformation is
only executed when an action on a dataset is called. This helps Spark to optimize the
execution.  


### Creating RDDs  
There are two ways to create an RDD in PySpark: you can either
.parallelize(...) a collection (list or an array of some elements) or Or you can reference a file (or files) located either locally or somewhere externally

* **Import necessary libraries**

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark import SparkContext

In [2]:
import findspark
findspark.init("C:/Program Files/spark-3.5.4-bin-hadoop3")

* **Configure environment variables dynamically**

In [3]:
os.environ["JAVA_HOME"] = "C:/Program Files/Java/jre1.8.0_431" 
os.environ["SPARK_HOME"] = "C:/Program Files/spark-3.5.4-bin-hadoop3" 
os.environ['HADOOP_HOME '] = 'C:/Program Files/hadoop-3.4.0'
 

* **Initialize SparkSession**

In [4]:
spark = SparkSession.builder \
    .appName("Learn PySpark") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

In [5]:
sc = spark.sparkContext         # Access the SparkContext
print("SparkSession and SparkContext initialized.")
print(sc)

SparkSession and SparkContext initialized.
<SparkContext master=local[*] appName=Learn PySpark>


In [6]:
print(sc.version)

3.5.4


In [7]:
data = sc.parallelize(
[('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12),
('Amber', 9)])

In [8]:
data_from_file = sc.\
textFile(
'datasets/VS14MORT.txt.gz', 4)

The last parameter in *sc.textFile(..., n)* specifies the number of partitions the dataset is divided into.  
A rule of thumb would be to break your dataset into two-four partitions for each in your cluster.

Spark can read from a multitude of filesystems: Local ones such as NTFS, FAT, or
Mac OS Extended (HFS+), or distributed filesystems such as HDFS, S3, Cassandra,
among many others.

Multiple data formats are supported: Text, parquet, JSON, Hive tables, and data
from relational databases can be read using a JDBC driver. Note that Spark
can automatically work with compressed datasets (like the Gzipped one in our
preceding example).

Depending on how the data is read, the object holding it will be represented slightly
differently. The data read from a file is represented as MapPartitionsRDD instead
of ParallelCollectionRDD when we .paralellize(...) a collection.

### Schema  

RDDs are schema-less data structures (unlike DataFrames, which we will discuss in
the next chapter). Thus, parallelizing a dataset, such as in the following code snippet,
is perfectly fine with Spark when using RDDs:

In [9]:
data_heterogenous = sc.parallelize([
('Ferrari', 'fast'),
{'Porsche': 100000},
['Spain','visited', 4504]
]).collect()

So, we can mix almost anything: a tuple, a dict, or a list and Spark will
not complain.

Once you .collect() the dataset (that is, run an action to bring it back to the driver)
you can access the data in the object as you would normally do in Python:

In [10]:
data_heterogenous[0][:2]

('Ferrari', 'fast')

The .collect() method returns all the elements of the RDD to the driver where it is
serialized as a list.

### Reading from files  

When you read from a text file, each row from the file forms an element of an RDD.
The data_from_file.take(1) command will produce the following (somewhat
unreadable)

In [11]:
#data_from_file.limit(1).collect()



In [12]:
data_from_file.take(1)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 8) (host.docker.internal executor driver): java.net.SocketException: Connection reset by peer: socket write error
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(Unknown Source)
	at java.net.SocketOutputStream.write(Unknown Source)
	at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
	at java.io.BufferedOutputStream.write(Unknown Source)
	at java.io.DataOutputStream.write(Unknown Source)
	at java.io.FilterOutputStream.write(Unknown Source)
	at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:492)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:312)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:751)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(Unknown Source)
	at java.net.SocketOutputStream.write(Unknown Source)
	at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
	at java.io.BufferedOutputStream.write(Unknown Source)
	at java.io.DataOutputStream.write(Unknown Source)
	at java.io.FilterOutputStream.write(Unknown Source)
	at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:492)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:312)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:751)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


In [None]:
data = sc.parallelize([1, 2, 3, 4, 5])
print(data.collect())

In [None]:
small_data = sc.parallelize([('Alice', 25), ('Bob', 30)])
print(small_data.take(1))

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Learning PySpark") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.network.timeout", "100s") \
    .getOrCreate()

sc = spark.sparkContext

# Retry your example
small_data = sc.parallelize([('Alice', 25), ('Bob', 30)])
print(small_data.take(1))


In [None]:
data_from_file.take(2)

### Lambda expressions  

In this example, we will extract the useful information from the cryptic looking
record of data_from_file.  

First, let's define the method with the help of the following code, which will parse
the unreadable row into something that we can use:

In [17]:
import re 
import numpy as np 

def extractInformation(row):
    selected_indices = [
         2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
         19,21,22,23,24,25,27,28,29,30,32,33,34,
         36,37,38,39,40,41,42,43,44,45,46,47,48,
         49,50,51,52,53,54,55,56,58,60,61,62,63,
         64,65,66,67,68,69,70,71,72,73,74,75,76,
         77,78,79,81,82,83,84,85,87,89
    ]
    
    
    '''
        Input record schema
        schema: n-m (o) -- xxx
            n - position from
            m - position to
            o - number of characters
            xxx - description
        1. 1-19 (19) -- reserved positions
        2. 20 (1) -- resident status
        3. 21-60 (40) -- reserved positions
        4. 61-62 (2) -- education code (1989 revision)
        5. 63 (1) -- education code (2003 revision)
        6. 64 (1) -- education reporting flag
        7. 65-66 (2) -- month of death
        8. 67-68 (2) -- reserved positions
        9. 69 (1) -- sex
        10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated
        11. 71-73 (3) -- number of units (years, months etc)
        12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)
        13. 75-76 (2) -- age recoded into 52 categories
        14. 77-78 (2) -- age recoded into 27 categories
        15. 79-80 (2) -- age recoded into 12 categories
        16. 81-82 (2) -- infant age recoded into 22 categories
        17. 83 (1) -- place of death
        18. 84 (1) -- marital status
        19. 85 (1) -- day of the week of death
        20. 86-101 (16) -- reserved positions
        21. 102-105 (4) -- current year
        22. 106 (1) -- injury at work
        23. 107 (1) -- manner of death
        24. 108 (1) -- manner of disposition
        25. 109 (1) -- autopsy
        26. 110-143 (34) -- reserved positions
        27. 144 (1) -- activity code
        28. 145 (1) -- place of injury
        29. 146-149 (4) -- ICD code
        30. 150-152 (3) -- 358 cause recode
        31. 153 (1) -- reserved position
        32. 154-156 (3) -- 113 cause recode
        33. 157-159 (3) -- 130 infant cause recode
        34. 160-161 (2) -- 39 cause recode
        35. 162 (1) -- reserved position
        36. 163-164 (2) -- number of entity-axis conditions
        37-56. 165-304 (140) -- list of up to 20 conditions
        57. 305-340 (36) -- reserved positions
        58. 341-342 (2) -- number of record axis conditions
        59. 343 (1) -- reserved position
        60-79. 344-443 (100) -- record axis conditions
        80. 444 (1) -- reserve position
        81. 445-446 (2) -- race
        82. 447 (1) -- bridged race flag
        83. 448 (1) -- race imputation flag
        84. 449 (1) -- race recode (3 categories)
        85. 450 (1) -- race recode (5 categories)
        86. 461-483 (33) -- reserved positions
        87. 484-486 (3) -- Hispanic origin
        88. 487 (1) -- reserved
        89. 488 (1) -- Hispanic origin/race recode
     '''
     
    record_split = re\
        .compile(
            r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + 
            r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + 
            r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
            r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
            r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + 
            r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + 
            r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
    try:
        rs = np.array(record_split.split(row))[selected_indices]
    except:
        rs = np.array(['-99'] * len(selected_indices))
    return rs 

Now, instead of using lambda we will use the extractInformation(...) method to split and convert our dataset.

Note that we pass only the method signature to .map(...): the method will
hand over one element of the RDD to the extractInformation(...) method at a
time in each partition:

In [None]:
data_from_file_conv = data_from_file.map(extractInformation)
data_from_file_conv.map(lambda row: row).take(2)

## Global versus local scope

One of the things that you, as a prospective PySpark user, need to get used to is the
inherent parallelism of Spark. Even if you are proficient in Python, executing scripts
in PySpark requires shifting your thinking a bit.  

Spark can be run in two modes: Local and cluster. When you run Spark locally
your code might not differ to what you are currently used to with running Python:
Changes would most likely be more syntactic than anything else but with an added
twist that data and code can be copied between separate worker processes.  

However, taking the same code and deploying it to a cluster might cause a lot of
head-scratching if you are not careful. This requires understanding how Spark
executes a job on the cluster.  

In the cluster mode, when a job is submitted for execution, the job is sent to
the driver (or a master) node. The driver node creates a DAG (see Chapter 1,
Understanding Spark) for a job and decides which executor (or worker) nodes will run
specific tasks.  

The driver then instructs the workers to execute their tasks and return the results
to the driver when done. Before that happens, however, the driver prepares each
task's closure: A set of variables and methods present on the driver for the worker
to execute its task on the RDD.  

This set of variables and methods is inherently static within the executors' context,
that is, each executor gets a copy of the variables and methods from the driver. If,
when running the task, the executor alters these variables or overwrites the methods,
it does so without affecting either other executors' copies or the variables and
methods of the driver. This might lead to some unexpected behavior and runtime
bugs that can sometimes be really hard to track down.

## Transformations  

Transformations shape your dataset. These include mapping, filtering, joining, and
transcoding the values in your dataset. In this section, we will showcase some of the
transformations available on RDDs.  
Due to space constraints we include only the most often used
transformations and actions here.  

Since RDDs are schema-less, in this section we assume you know the schema of the
produced dataset.  


### The .map(...) transformation  

It can be argued that you will use the .map(...) transformation most often. The
method is applied to each element of the RDD: In the case of the data_from_file_
conv dataset, you can think of this as a transformation of each row.  


In this example, we will create a new dataset that will convert year of death into a
numeric value:

In [None]:
data_2014 = data_from_file_conv.map(lambda row: int(row[16]))
data_2014.take(10)

You can of course bring more columns over, but you would have to package them
into a tuple, dict, or a list. Let's also include the 17th element of the row along so
that we can confirm our .map(...) works as intended:

In [None]:
data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
data_2014_2.take(10)

### The .filter(...) transformation  
Another most often used transformation is the .filter(...) method, which allows
you to select elements from your dataset that fit specified criteria. As an example,
from the data_from_file_conv dataset, let's count how many people died in an
accident in 2014:  


In [None]:
data_filtered = data_from_file_conv.filter(lambda row: row[16] == '2014' and row[21] == '0')
data_filtered.count()

In [None]:
data_filtered_2 = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
data_filtered_2.count()

### The .flatMap(...) transformation  

The .flatMap(...) method works similarly to .map(...), but it returns a flattened
result instead of a list.

In [None]:
data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
data_2014_flat.take(10)

You can compare this result with the results of the command that generated
data_2014_2 previously. Note, also, as mentioned earlier, that the .flatMap(...)
method can be used to filter out some malformed records when you need to parse
your input. Under the hood, the .flatMap(...) method treats each row as a list and
then simply adds all the records together; by passing an empty list the malformed
records is dropped.

### The .distinct(...) transformation  
This method returns a list of distinct values in a specified column. It is extremely
useful if you want to get to know your dataset or validate it. Let's check if the gender
column contains only males and females; that would verify that we parsed the
dataset properly. Let's run the following code:  



In [None]:
distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct()

distinct_gender.collect()

First, we extract only the column that contains the gender. Next, we use the
.distinct() method to select only the distinct values in the list. Lastly, we use the
.collect() method to return the print of the values on the screen.  

*[Note that this is an expensive method and should be used sparingly and
only when necessary as it shuffles the data around.]*

### The .sample(...) transformation  

The .sample(...) method returns a randomized sample from the dataset. The first
parameter specifies whether the sampling should be with a replacement, the second
parameter defines the fraction of the data to return, and the third is seed to the
pseudo-random numbers generator:  


In [26]:
fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 666)

In this example, we selected a randomized sample of 10% from the original dataset.
To confirm this, let's print the sizes of the datasets:  

In [None]:
print(f'Original dataset:{data_from_file_conv.count()}, sample: {data_sample.count()}')

We use the .count() action that counts all the records in the corresponding RDDs.

### The .leftOuterJoin(...) transformation  

.leftOuterJoin(...), just like in the SQL world, joins two RDDs based on the
values found in both datasets, and returns records from the left RDD with records
from the right one appended in places where the two RDDs match:

In [None]:
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c', 10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b',  '6'), ('d',15)])
rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.collect()

*[This is another expensive method and should be used sparingly and only
when necessary as it shuffles the data around causing a performance hit.]*

What you can see here are all the elements from RDD rdd1 and their corresponding
values from RDD rdd2. As you can see, the value 'a' shows up two times in rdd3
and 'a' appears twice in the RDD rdd2. The value b from the rdd1 shows up only
once and is joined with the value '6' from the rdd2. There are two things missing:
Value 'c' from rdd1 does not have a corresponding key in the rdd2 so the value in
the returned tuple shows as None, and, since we were performing left outer join,
the value 'd' from the rdd2 disappeared as expected.

If we used the .join(...) method instead we would have got only the values
for 'a' and 'b' as these two values intersect between these two RDDs. Run the
following code:

In [None]:
rdd4 = rdd1.join(rdd2)
rdd4.collect()

Another useful method is .intersection(...), which returns the records that are
equal in both RDDs. Execute the following code:

In [None]:
rdd5 = rdd1.intersection(rdd2)
rdd5.collect()

### The .repartition(...) transformation  

Repartitioning the dataset changes the number of partitions that the dataset is
divided into. This functionality should be used sparingly and only when really
necessary as it shuffles the data around, which in effect results in a significant
hit in terms of performance:  


In [None]:
rdd1 = rdd1.repartition(4)
len(rdd1.glom().collect())

The preceding code prints out 4 as the new number of partitions.
The .glom() method, in contrast to .collect(), produces a list where each element
is another list of all elements of the dataset present in a specified partition; the main
list returned has as many elements as the number of partitions.

## Actions  

Actions, in contrast to transformations, execute the scheduled task on the
dataset; once you have finished transforming your data you can execute your
transformations. This might contain no transformations (for example, .take(n) will
just return n records from an RDD even if you did not do any transformations to it)
or execute the whole chain of transformations.  


### The .take(...) method  

This is most arguably the most useful (and used, such as the .map(...) method).
The method is preferred to .collect(...) as it only returns the n top rows from a
single data partition in contrast to .collect(...), which returns the whole RDD.
This is especially important when you deal with large datasets:  


In [None]:
data_first = data_from_file_conv.take(1)
data_first

If you want somewhat randomized records you can use .takeSample(...)
instead, which takes three arguments: First whether the sampling should be with
replacement, the second specifies the number of records to return, and the third
is a seed to the pseudo-random numbers generator:

In [None]:
data_take_sampled = data_from_file_conv.takeSample(False, 1, 667)
data_take_sampled

### The .collect(...) method  
This method returns all the elements of the RDD to the driver. As we have just
provided a caution about it, we will not repeat ourselves here.  


### The .reduce(...) method  

The .reduce(...) method reduces the elements of an RDD using a specified method.
You can use it to sum the elements of your RDD:  

In [None]:
rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)

We first create a list of all the values of the rdd1 using the .map(...) transformation,
and then use the .reduce(...) method to process the results. The reduce(...)
method, on each partition, runs the summation method (here expressed as a lambda)
and returns the sum to the driver node where the final aggregation takes place.

* **WARNING!!!**: A word of caution is necessary here. The functions passed as a reducer
need to be associative, that is, when the order of elements is changed the
result does not, and commutative, that is, changing the order of operands
does not change the result either.
The example of the associativity rule is (5 + 2) + 3 = 5 + (2 + 3), and of the
commutative is 5 + 2 + 3 = 3 + 2 + 5. Thus, you need to be careful about
what functions you pass to the reducer.
If you ignore the preceding rule, you might run into trouble (assuming
your code runs at all). For example, let's assume we have the following
RDD (with one partition only!):

In [37]:
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)

If we were to reduce the data in a manner that we would like to divide the
current result by the subsequent one, we would expect a value of 10:

In [None]:
works = data_reduce.reduce(lambda x, y: x / y)
works

However, if you were to partition the data into three partitions, the result
will be wrong:

In [None]:
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3)
data_reduce.reduce(lambda x, y: x / y)

* **The .reduceByKey(...)** method works in a similar way to the .reduce(...)
method, but it performs a reduction on a key-by-key basis:

In [None]:
data_key = sc.parallelize(
[('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1), ('d', 3)],4)

data_key.reduceByKey(lambda x, y: x + y).collect()

### The .count(...) method  

The .count(...) method counts the number of elements in the RDD. Use the
following code:

In [None]:
data_reduce.count()

This code will produce 6, the exact number of elements in the data_reduce RDD.  

The .count(...) method produces the same result as the following method, but it
does not require moving the whole dataset to the driver:

In [None]:
len(data_reduce.collect()) # WRONG -- DON'T DO THIS!

If your dataset is in a key-value form, you can use the .countByKey() method to get
the counts of distinct keys. Run the following code:

In [None]:
data_key.countByKey().items()

### The .saveAsTextFile(...) method  

As the name suggests, the .saveAsTextFile(...) the RDD and saves it to text files:
Each partition to a separate file:  

In [13]:
data_key.saveAsTextFile('data_key.txt')

NameError: name 'data_key' is not defined

To read it back, you need to parse it back as all the rows are treated as strings:

In [None]:
def parseInput(row):
    
    pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
    row_split = pattern.split(row)
    
    return (row_split[1], int(row_split[2]))
    
data_key_reread = sc \
    .textFile('/Users/drabast/Documents/PySpark_Data/data_key.txt') \
    .map(parseInput)
data_key_reread.collect()

### The .foreach(...) method  

This is a method that applies the same function to each element of the RDD in an
iterative way; in contrast to .map(..), the .foreach(...) method applies a defined
function to each record in a one-by-one fashion. It is useful when you want to save
the data to a database that is not natively supported by PySpark.

Here, we'll use it to print (to CLI - not the Jupyter Notebook) all the records that are
stored in data_key RDD:

In [None]:
def f(x):
    print(x)
data_key.foreach(f)

If you now navigate to CLI you should see all the records printed out. Note, that
every time the order will most likely be different.

## Summary

RDDs are the backbone of Spark; these schema-less data structures are the most
fundamental data structures that we will deal with within Spark.
In this chapter, we presented ways to create RDDs from text files, by means of the
.parallelize(...) method as well as by reading data from text files. Also, some
ways of processing unstructured data were shown.
Transformations in Spark are lazy - they are only applied when an action is called. In
this chapter, we discussed and presented the most commonly used transformations
and actions; the PySpark documentation contains many more http://spark.
apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.
One major distinction between Scala and Python RDDs is speed: Python RDDs can
be much slower than their Scala counterparts.
In the next chapter we will walk you through a data structure that made PySpark
applications perform on par with those written in Scala - the DataFrames.