# Spark Fundamentals:

# RDD (Resilient Distributed Dataset)
   A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an ***Immutable***, ***partitioned collection of elements*** that can be ***operated on in parallel***.
    
   **Resilient**, i.e. fault-tolerant. With the help of RDD lineage graph spark is able to recompute missing or damaged partitions due to node failures.
    
   **Distributed** Data residing on multiple nodes in a cluster.
    
   **Dataset** is a collection of partitioned data with primitive values or values of values, e.g. tuples or other objects (that represent records of the data you work with).
   

All the work we do in spark is either create a new RDD or transform an existing RDD or call a action on an RDD.

# Creating a RDD from a file

In [1]:
import pyspark

sc = pyspark.SparkContext("local", "PySparkWordCount")

In [26]:
data_file = "data/session_intro.txt"
raw_data = sc.textFile(data_file)

In [27]:
raw_data.count()

13

In [28]:
raw_data.take(5)

['Topic Abstract',
 '',
 'Designing an enterprise class streaming application has significant challenges. Such challenges include handling imperfections in data stream - e.g. out of order arrival of data, handling system recoverable or irrecoverable system failures, and guaranteeing consistent results.',
 '',
 'In real world applications these discrepancies are often exhibited and current distributed stream computation requires end user to handle such discrepancies and design custom solutions. ']

# Creating and RDD using parallelize


In [29]:
a = range(100)

data = sc.parallelize(a)

In [30]:
data.count()

100

In [31]:
data.take(5)

[0, 1, 2, 3, 4]

In [32]:
#### Word count in Spark
import operator

#Get a RDD containing lines from this script file  
lines = sc.textFile(data_file)

#Split each line into words and assign a frequency of 1 to each word
words = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1))

#count the frequency for words
counts = words.reduceByKey(operator.add)

#Sort the counts in descending order based on the word frequency
sorted_counts =  counts.sortBy(lambda x: x[1], False)

#Get an iterator over the counts to print a word and its frequency
for word,count in sorted_counts.toLocalIterator():
    print(u"{} --> {}".format(word, count))


and --> 9
 --> 8
of --> 7
stream --> 5
to --> 4
streaming --> 4
structured --> 3
application --> 3
in --> 3
are --> 3
end --> 3
In --> 3
handling --> 3
discrepancies --> 2
data --> 2
consistency, --> 2
world --> 2
He --> 2
real --> 2
from --> 2
has --> 2
problems --> 2
system --> 2
on --> 2
I --> 2
will --> 2
such --> 2
Spark --> 2
talk --> 2
is --> 2
out --> 1
enterprise --> 1
while --> 1
challenges --> 1
significant --> 1
how --> 1
high-throughput --> 1
these --> 1
irrecoverable --> 1
8 --> 1
about --> 1
continuous --> 1
the --> 1
International --> 1
try --> 1
Topic --> 1
Speaker --> 1
introduced --> 1
notion --> 1
applications --> 1
Institute --> 1
arrival --> 1
modelling, --> 1
simplify --> 1
BFSI --> 1
current --> 1
solved --> 1
graduated --> 1
imperfection --> 1
fault-tolerance --> 1
where --> 1
explain --> 1
custom --> 1
queries --> 1
include --> 1
failures, --> 1
transactional --> 1
strong --> 1
order --> 1
provided --> 1
processing --> 1
component --> 1
Designing --> 1
handle 

# Transformations

- Filter Transformation: This transformation can be applied to RDDs in order to keep just elements that satisfy a certain condition

In [43]:
## Filter lines which contains Spark in it
normal_raw_data = raw_data.filter(lambda x: 'Spark' in x)

In [51]:
normal_raw_data.take(2)

['Structured streaming is the new component introduced from Spark 2.0 to simplify end to end development of continuous application where consistency, fault-tolerance and stream imperfection handling mechanisms are inbuilt. ',
 'In this talk I will try to explain problems faced while building high-throughput real world streaming application and how such problems can be solved using structured streaming and structured stream processing model provided in Spark 2.0. In specific I will talk about strong notion of prefix consistency, transactional source modelling, interactive stream queries and analytics on structured streams.']

In [56]:
normal_count = normal_raw_data.count()

In [57]:
normal_count

2

# **Map transformation:** 
    # apply a function to every element in our RDD

    In this case we want to read our data file as a CSV formatted one. We can do this by applying a lambda function to each element in the RDD as follows.

In [100]:
from pprint import pprint

data_file = "data/status_data.csv"
raw_data = sc.textFile(data_file)

csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print("Parse completed in {} seconds".format(round(tt,3)))
pprint(head_rows[0:4])

Parse completed in 0.069 seconds
[['"station_id"', '"bikes_available"', '"docks_available"', '"time"'],
 ['"2"', '"18"', '"9"', '"9/1/2015 00:00:02"'],
 ['"2"', '"18"', '"9"', '"9/1/2015 00:01:02"'],
 ['"2"', '"18"', '"9"', '"9/1/2015 00:02:02"']]


# Use Map with Predefined functions

In [99]:
def parse_line(line):
    elems = line.split(",")
    tag = elems[0]
    return (tag, elems)

key_csv_data = raw_data.map(parse_line)
head_rows = key_csv_data.take(5)
pprint(head_rows[2:5])

[('"2"', ['"2"', '"18"', '"9"', '"9/1/2015 00:01:02"']),
 ('"2"', ['"2"', '"18"', '"9"', '"9/1/2015 00:02:02"']),
 ('"2"', ['"2"', '"18"', '"9"', '"9/1/2015 00:03:03"'])]


# The collect action:

So far we have used the actions count and take. Another basic action we need to learn is collect. Basically it will get all the elements in the RDD into memory for us to work with them. For this reason it has to be used with care, specially when working with large RDDs.

In [103]:
t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print("Data collected in {} seconds".format(round(tt,3)))

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/apundhir/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1028, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/apundhir/anaconda/envs/MySpark/lib/python3.5/socket.py", line 576, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 54] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/apundhir/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 883, in send_command
    response = connection.send_command(command)
  File "/Users/apundhir/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1040, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving


Py4JError: An error occurred while calling o11.setCallSite