In [0]:
#Import Packages
from pyspark import SparkContext
sc = SparkContext("local", "First App")

###Resilient Distributed Dataset (RDD)

RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.


####Creating and RDD using parallelize

In [0]:
rdd = range(100)
    
data = sc.parallelize(rdd)

data.count()

100

In [0]:
# access the first 4 elements on our RDD.
data.take(4)

[0, 1, 2, 3]

####map() and flatMap()  Transformations in Spark

#####map()

map() transformation applies changes on each line of the RDD and returns the transformed RDD as iterable of iterables i.e. each line is equivalent to a iterable and the entire RDD is itself a list

#####flatMap()

 This transformation apply changes to each line same as map but the return is not a iterable of iterables but it is only an iterable holding entire RDD contents.

In [0]:
rdd = sc.parallelize([5,10,15])

rdd.map(lambda x: range(x)).collect()

[range(0, 5), range(0, 10), range(0, 15)]

####Creating RDD from a file in PySpark 

In [0]:
from google.colab import files
uploaded = files.upload()

Saving temp.txt to temp.txt


In [0]:
RDDread = sc.textFile ("/content/temp.txt")


In [0]:
#View Contents of the RDD using the collect () action

RDDread.collect()


['I worked at KFC from 2001-2005 at 3 different stores and the policy was to throw all the food away at the end of the night. ',
 'I would typically give people that ordered food near closing extra (if we had a lot of food left) as it seemed like',
 'such a waste to just throw it all away. I would also try to sneak food home for my roommates when possible,',
 "which I guess was technically stealing, but at least it wasn't thrown in the trash. ",
 '',
 '',
 'I was posted in Surgery, and it was my last day of Surgery posting and since it was our OPD day, I had to',
 'stay back at night to help my PG with the casualty cases. Never before did we have so many admissions, most of',
 'them with instestinal obstruction ( with 2 having perforations). It was a really bad day. My co-interns finished their',
 'duty and I came back (those doing night duty get to take a 6hour break for some sleep) to the hospital and resumed my duties.']

In [0]:
 # take (n) - This will return the first n lines from the dataset

RDDread.take(4)

['I worked at KFC from 2001-2005 at 3 different stores and the policy was to throw all the food away at the end of the night. ',
 'I would typically give people that ordered food near closing extra (if we had a lot of food left) as it seemed like',
 'such a waste to just throw it all away. I would also try to sneak food home for my roommates when possible,',
 "which I guess was technically stealing, but at least it wasn't thrown in the trash. "]

In [0]:
# Count () – To know the number of lines in a RDD

RDDread.count()

10

In [0]:
# Use map 
RDDmap = RDDread.map(lambda line : line.split(" "))
RDDmap.take(2)

[['I',
  'worked',
  'at',
  'KFC',
  'from',
  '2001-2005',
  'at',
  '3',
  'different',
  'stores',
  'and',
  'the',
  'policy',
  'was',
  'to',
  'throw',
  'all',
  'the',
  'food',
  'away',
  'at',
  'the',
  'end',
  'of',
  'the',
  'night.',
  ''],
 ['I',
  'would',
  'typically',
  'give',
  'people',
  'that',
  'ordered',
  'food',
  'near',
  'closing',
  'extra',
  '(if',
  'we',
  'had',
  'a',
  'lot',
  'of',
  'food',
  'left)',
  'as',
  'it',
  'seemed',
  'like']]

In [0]:
# check the flatMap operation and how is it different from Map transformations

Rddflatmap = RDDread.flatMap(lambda line : line.split(" "))

Rddflatmap.take(2)

['I', 'worked']

**filter(func) :** Return a new dataset formed by selecting those elements of the source on which func returns true.

Example : find out the lines having 'stores' term in it.

In [0]:
RDDfilter = RDDread.filter(lambda line : ("stores" in line.lower()))

RDDfilter.count()

1

####Sampling RDDs

In Spark, there are two sampling operations, the transformation sample and the action takeSample. By using a transformation we can tell Spark to apply successive transformation on a sample of a given RDD. By using an action we retrieve a given sample and we can have it in local memory.

The **sample** transformation takes up to three parameters. First is wether the sampling is done with replacement or not. Second is the sample size as a fraction. Finally we can optionally provide a random seed.


In [0]:
raw_data = sc.textFile("/content/temp.txt")


raw_data_sample = raw_data.sample(False, 0.1, 1234)

sample_size = raw_data_sample.count()

total_size = raw_data.count()

print ( "Sample size is {} of {}".format(sample_size, total_size) )

Sample size is 2 of 10


####union() Transformation

Union is basically used to merge two RDDs together if they have the same structure.

In [0]:
marks_a = [("physics",70),("maths",60),("chemistry",95)]

marks_b = [("physics",80),("maths",55),("chemistry",85)]

rdd_a = sc.parallelize(marks_a)

rdd_b = sc.parallelize(marks_b)

rdd_a.union(rdd_b).collect()

[('physics', 70),
 ('maths', 60),
 ('chemistry', 95),
 ('physics', 80),
 ('maths', 55),
 ('chemistry', 85)]

### join() Transformation

joins two RDDs based on a common key..

In [0]:
Subject_wise_marks = rdd_a.join(rdd_b)

Subject_wise_marks.collect()

[('physics', (70, 80)), ('maths', (60, 55)), ('chemistry', (95, 85))]