# Abstracting Data with RDDs

## Introduction

Resilient Distributed Datasets (RDDs) are collections of immutable JVM objects that are distributed across an Apache Spark cluster.klyor error.

An RDD is the most fundamental dataset type of Apache Spark; any action on a Spark DataFrame eventually gets translated into a highly optimized execution of transformations and actions on RDDs.

Data in an RDD is split into chunks based on a key and then dispersed across all the executor nodes. RDDs are highly resilient, that is, there are able to recover quickly from any issues as the same data chunks are replicated across multiple executor nodes. Thus, even if one executor fails, another will still process the data. This allows you to perform your functional calculations against your dataset very quickly by harnessing the power of multiple nodes. RDDs keep a log of all the execution steps applied to each chunk. This, on top of the data replication, speeds up the computations and, if anything goes wrong, RDDs can still recover the portion of the data lost due to an executor error.

While it is common to lose a node in distributed environments (for example, due to connectivity issues, hardware problems), distribution and replication of the data defends against data loss, while data lineage allows the system to recover quickly.

## Creating RDDs

There are 
two ways to create an RDD in PySpark: you can either us 
the parallelize() method—a collection (list or an array of some elements)  r
reference a file (or files) located either locally or through an exter al
so.ipes

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()
sc = spark.sparkContext

23/09/11 21:37:24 WARN Utils: Your hostname, elvin-Aspire-E1-571 resolves to a loopback address: 127.0.1.1; using 192.168.151.183 instead (on interface wlp3s0)
23/09/11 21:37:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/11 21:37:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Using parallelize() method

The following code snippet creates your RDD (myRDD) using
the sc.parallelize() method:

In [2]:
myRDD = sc.parallelize([('Mike', 19), ('June', 18), ('Rachel',16), ('Rob', 18),
('Scott', 17)])

To view what is inside your RDD, you can run the following code snippet:

In [3]:
myRDD.take(5)

                                                                                

[('Mike', 19), ('June', 18), ('Rachel', 16), ('Rob', 18), ('Scott', 17)]

#### How it works...
Let's break down the two methods in the preceding code snippet:`
sc.parallelize(`) and` take(`.)

##### Spark context parallelize method

The sc.parallelize() method is the SparkContext's parallelize method to create a parallelized collection.
This allows Spark to distribute the data across multiple nodes, instead of depending on a single node to process the data.

Now that we have created myRDD as a parallelized collection, Spark can 
operate against this data in parallel. Once created, the distributed datase 
(distData) can be operated on in parall.

##### .take(...) method

Now that you have created your RDD (myRDD), we will use the take() method to return the values to the console (or notebook cell). We will now execute an RDD action, take(). Note that a common approach in PySpark is to use collect(), which returns all values in your RDD from the Spark worker nodes to the driver. There are performance implications when working with a large amount of data as this translates to large volumes of data being transferred from the Spark worker nodes to the driver. For small amounts of data (such as this recipe), this is perfectly fine, but, as a matter of habit, you should pretty much
always use the take(n) method instead; it returns the first n elements of the RDD instead of the whole dataset. It is a more efficient method because it first scans one partition and uses those statistics to determine the number of partitions required to return the results.le

### Reading data from files

We will create an RDD by reading a local file in PySpark.

Note that while this recipe is specific 
to reading local files, a similar syntax can be applied for Hadoop, AWS S3 
Azure WASBs, and/or Google Cloud Storage

In [4]:
directory = "/home/elvin/Documents/Engenharia-De-Dados/Estudos/PySpark/"

myRDD = (sc.textFile(directory + 'airport-codes-na.txt', minPartitions=4, use_unicode=True)).map(lambda element: element.split("\t"))

In [5]:
myRDD.take(5)

[['City', 'State', 'Country', 'IATA'],
 ['Abbotsford', 'BC', 'Canada', 'YXX'],
 ['Aberdeen', 'SD', 'USA', 'ABR'],
 ['Abilene', 'TX', 'USA', 'ABI'],
 ['Akron', 'OH', 'USA', 'CAK']]

In [6]:
myRDD.count() # numero de linhas no RDD

527

In [7]:
myRDD.getNumPartitions() # numero de particoes que suportam este RDD

4

#### How it works...

The first code snippet to read the file and return values via take can be 
broken down into its two components: sc.textFile() and map()

##### .textFile(...) method

To read the file, we are using SparkContext's textFile() method.

Only the first parameter is required, which indicates the location of the text 
file as per ~/data/flights/airport-codes-na.txt. There are two optiona 
parameters as wel

* minPartitions: Indicates the minimum number of partitions that make up 
the RDD. The Spark engine can often determine the best number o 
partitions based on the file size, but you may want to change t e
number of partitions for performance reasons and, hence, the ability to
specify the minimum numb* er.
use_unicode: Engage this parameter if you are processing Unicode 

Note that if you were to execute this statement without the subsequent 
map() function, the resulting RDD would not reference the tab-delimite

##### .map(...) method

To make sense of the tab-delimiter with an RDD, we will use the 
.map(...) function to transform the data from a list of strings to a list of list.

The key components of this map transformation are:* 
lambda: An anonymous function (that is, a function defined without  
name) composed of a single expressio* n
split: We're using PySpark's split function (within pyspark.sql.functio s)
to split a string around a regular expression pattern; in this case, our
delimiter is a tab (that i)s, \tsrdata.l:)

## Partitions and performance

A key aspect of partitions for your RDD is that the more partitions you
have, the higher the parallelism. Potentially, having more partitions wil 
improve your query performan.ce

In [8]:
myRDD = sc.textFile(directory + "departuredelays.csv").map(lambda x: x.split(","))

In [9]:
myRDD.count()

                                                                                

1391579

In [10]:
myRDD.getNumPartitions()

2

In [11]:
myRDD = sc.textFile(directory + "departuredelays.csv", minPartitions=8).map(lambda x: x.split(","))

In [12]:
myRDD.count()

                                                                                

1391579

## Overview of RDD transformations

There are two types of operation that can be used to shape data in an RDD: transformations and actions. A transformation, as the name suggests, transforms one RDD into another. In other words, it takes an existing RDD and transforms it into one or more output RDDs. . In the preceding recipes, we had used a map() function, which 
is an example of a transformation to split the data by its tab-delimiter

Transformations are lazy (unlike actions). They only get executed when an 
action is called on an RDD. For example, calling the count()ffunction is a 
acti.on

In [13]:
airports = (sc.textFile(directory + 'airport-codes-na.txt')).map(lambda x: x.split("\t"))

In [14]:
airports.take(5)

[['City', 'State', 'Country', 'IATA'],
 ['Abbotsford', 'BC', 'Canada', 'YXX'],
 ['Aberdeen', 'SD', 'USA', 'ABR'],
 ['Abilene', 'TX', 'USA', 'ABI'],
 ['Akron', 'OH', 'USA', 'CAK']]

In [15]:
flights = (sc.textFile(directory + 'departuredelays.csv').map(lambda x: x.split(",")))

In [16]:
flights.take(5)

[['date', 'delay', 'distance', 'origin', 'destination'],
 ['01011245', '6', '602', 'ABE', 'ATL'],
 ['01020600', '-8', '369', 'ABE', 'DTW'],
 ['01021245', '-2', '602', 'ABE', 'ATL'],
 ['01020605', '-4', '602', 'ABE', 'ATL']]

The transformations include the following common tasks:
* Removing the header line from your text file: zipWithIndex()
* Selecting columns from your RDD: map()
* Running a WHERE (filter) clause: filter()
* Getting the distinct values: distinct()
* Getting the number of partitions: getNumPartitions()
* Determining the size of your partitions (that is, the number of elements within each partition): mapPartitionsWithIndex()

### .map(...) transformation

The map(f) transformation returns a new RDD formed by passing each
element through a function, f.

In [17]:
airports.map(lambda c: (c[0], c[1])).take(5)

[('City', 'State'),
 ('Abbotsford', 'BC'),
 ('Aberdeen', 'SD'),
 ('Abilene', 'TX'),
 ('Akron', 'OH')]

### .filter(...) transformation

The filter(f) transformation returns a new RDD based on selecting 
elements for which the f function returns tru.e

In [18]:
# User filter() to filter where second column == "WA"
airports.map(lambda c: (c[0], c[1])).filter(lambda c: c[1] == "WA").take(5)

[('Bellingham', 'WA'),
 ('Moses Lake', 'WA'),
 ('Pasco', 'WA'),
 ('Pullman', 'WA'),
 ('Seattle', 'WA')]

### .flatMap(...) transformation

The flatMap(f) transformation is similar to map, but the new RDD flattens 
out all of the elements (that is, a sequence of events).

In [20]:
airports.filter(lambda c: c[1] =="WA").map(lambda c: (c[0], c[1])).flatMap(lambda x: x).take(10)

['Bellingham',
 'WA',
 'Moses Lake',
 'WA',
 'Pasco',
 'WA',
 'Pullman',
 'WA',
 'Seattle',
 'WA']

### .distinct() transformation

The distinct() transformation returns a new RDD containing the distinct 
elements of the source RDD

In [21]:
airports.map(lambda c: c[2]).distinct().take(5)

                                                                                

['Country', 'Canada', 'USA']

### .sample(...) transformation

The sample(withReplacement, fraction, seed) transformation samples a fraction 
of the data, with or without replacement (the withReplacement parameter) 
based on a random se.ed

In [22]:
flights.map(lambda c: c[3]).sample(False, 0.001, 42).take(5)

                                                                                

['ABE', 'ABQ', 'ACV', 'AEX', 'ALB']

### .join(...) transformation

The join(RDD') transformation returns an RDD of (key, (val_left, val_right)) 
when calling RDD (key, val_left) and RDD (key, val_right). Outer joins ar 
supported through left outer join, right outer join, and full outer join 

In [23]:
flt = flights.map(lambda c: (c[3], c[0]))

air = airports.map(lambda c: (c[3], c[1]))

flt.join(air).take(5)

                                                                                

[('ADQ', ('01011710', 'AK')),
 ('ADQ', ('01021710', 'AK')),
 ('ADQ', ('01020815', 'AK')),
 ('ADQ', ('01031710', 'AK')),
 ('ADQ', ('01030815', 'AK'))]

### .repartition(...) transformation

The repartition(n) transformation repartitions the RDD into n partitions by 
randomly reshuffling and uniformly distributing data across the networ.k

In [24]:
flights.getNumPartitions()

2

In [27]:
flights2 = flights.repartition(8)
flights2.getNumPartitions()

8

### .zipWithIndex() transformation

The zipWithIndex() transformation appends (or ZIPs) the RDD with the 
element indices. This is very handy when wanting to remove the header ro 
(first row) of a fil.

In [28]:
ac = airports.map(lambda c: (c[0], c[3]))
ac.zipWithIndex().take(5)

[(('City', 'IATA'), 0),
 (('Abbotsford', 'YXX'), 1),
 (('Aberdeen', 'ABR'), 2),
 (('Abilene', 'ABI'), 3),
 (('Akron', 'CAK'), 4)]

23/09/11 22:02:42 ERROR Executor: Exception in task 0.0 in stage 23.0 (TID 41)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 822, in process
    serializer.dump_stream(out_iter, outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/pyspark/rdd.py", line 2830, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
TypeError: <lambda>() missing 1 required positional argument: 'idx'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunn

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 41) (192.168.151.183 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 822, in process
    serializer.dump_stream(out_iter, outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/pyspark/rdd.py", line 2830, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
TypeError: <lambda>() missing 1 required positional argument: 'idx'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:179)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:179)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 822, in process
    serializer.dump_stream(out_iter, outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/pyspark/rdd.py", line 2830, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
TypeError: <lambda>() missing 1 required positional argument: 'idx'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:179)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
