In [0]:
# Creating RDDs 
# RDDs can be created from in-memory collections, or from data stored in external data sources (e.g., files on HDFS).  
# The example below creates an RDD from an in-memory collection:

In [0]:
# RDD(Resilient Distributed Dataset)
wordsRDD=sc.parallelize(["fish","cat","dog"])

In [0]:
wordsRDD

In [0]:
# Provide an instance of an iterable object (sequence)
# as argument. Note that the Spark Context is automatically 
# instantiated by the notebook, and is bound to the 'sc' variable.
# It will need to be created explicitly for a standalone program.
rdd = sc.parallelize(range(30))
rdd

In [0]:
# Spark Context can also be accessed through the Spark Session object, which is
# automatically created and bound to the variable 'spark'
rdd1 = spark.sparkContext.parallelize(range(3, 5))
rdd1

In [0]:
# Collect
# An RDD does not live in the local memory of a driver program (unless in local mode), but rather is broken into pieces and distributed across a cluster. While there, it undergoes a series of transformations as prescribed by the driver program (which is converted into a lineage graph, or DAG). The transformations are not actually executed until an action is called. An action causes data to be produced and materialized in either the driver program memory, or an external store (e.g., HDFS).

# The collect() action below causes the RDD content to be shipped back to a driver program and materialized in its memory as a list. Do not call it unless you are sure your RDD is sufficiently small!

In [0]:
result = rdd.collect()
print(type(result))
print(result)

In [0]:
result = rdd1.collect()
print(type(result))
print(result)

In [0]:
# Map
# Map transformation is one of the most commonly used. As in Python, it applies a given function to every element of an RDD, and returns the resulting RDD. Note that this new RDD is not actual data! It is just an abstract handle encapsulating the transformation outcome that we can use to invoke another transformation or action.

In [0]:
print(rdd.map(lambda x: x*x).collect())

In [0]:
# Reduce
# Reduce is a commonly used action. It takes an RDD as input and returns a single reduced value. As in Python, to obtain the result, it repeatedly applies a reduce operator to the elements of an RDD. The reduce operator is a function that takes two elements as input and returns one as output.

# Workers execute reduce in parallel: Each executor reduces the data local to it, and the results from all executors are combined. It is therefore extermely important that the reduce operator is both commutative and associative as the execution order cannot be known in advance.

In [0]:
# Reduce rdd to the sum of its elements
rdd.reduce(lambda x,y:x+y)

In [0]:
# Here is a slightly more elaborate example. Try to figure out what this code is doing, and the result is going to be. Then run the code to check yourself.

In [0]:
words=['this','is','the','best','linux','ever','Saikiran']
wordRDD=sc.parallelize(words)
wordRDD.reduce(lambda w,v: w if len(w)>len(v) else v)

In [0]:
# Simple Map and Reduce pipelines
# Compute the sum of squares. Sequential syntax:

In [0]:
Squares=rdd.map(lambda x:x*x)
Squares.reduce(lambda x,y:x+y)

In [0]:
#cascading is more compact and "functional": 
rdd.map(lambda x:x*x).reduce(lambda x,y:x+y)

In [0]:
# Extracting basic information about an RDD
# RDD's typically have hundreds of thousands of elements. It usually makes no sense to print out the content of a whole RDD. Here are some ways to get manageable amounts of information about an RDD.

# Create an RDD of length n which is a repetition of the pattern 1,2,3,4

In [0]:
n = 1000000
rdd = sc.parallelize([1,2,3,4]*int(n/4))

In [0]:
# Use the count() action to find the number of elements in the RDD
rdd.count()

In [0]:
# Get the first few elements of an RDD
# Both first() and take() are actions
print('first element=', rdd.first())
print('first 5 elements = ', rdd.take(10))

In [0]:
# Sampling an RDD
# RDDs are often very large.
# Aggregates, such as averages, can be approximated efficiently by using a sample.
# Sampling is done in parallel and requires limited computation.


# The transformation sample(withReplacement, p) generates a sample of the elements of the RDD. where

# withReplacement is a boolean flag indicating whether or not a an element in the RDD can be sampled more than once.
# p is the probability of accepting each element into the sample. Note that as the sampling is performed independently in each partition, the number of elements in the sample changes from sample to sample.

In [0]:
# get a sample whose expected size is m
# Note that the size of the sample is different in different runs
m=5.
print('sample1=', rdd.sample(False, m/n).collect()) 
print('sample2=', rdd.sample(False, m/n).collect())


In [0]:
# Filtering an RDD
# The transformation filter(func) returns a new RDD formed by selecting those elements of the source RDD on which func returns true

In [0]:
print('the number of elements in rdd that are > 3 =', rdd.filter(lambda n: n > 3).count())

In [0]:
# Removing duplicate elements from an RDD
# The transformation distinct() returns a new RDD that contains the distinct elements of the source RDD.

# This operation requires a shuffle in order to detect duplication across partitions (we'll discuss the implications of this later in the class).

In [0]:
# Remove duplicate element in DuplicateRDD, we get distinct RDD
DuplicateRDD = sc.parallelize([1, 1, 2, 2, 3, 3])
print('DuplicateRDD=', DuplicateRDD.collect())
print('DistinctRDD = ', DuplicateRDD.distinct().collect())

In [0]:
# Flat Map
# The transformation flatMap(func) is similar to map, but the result is "flattened" before being returned.

In [0]:
text=["you are my sunshine","my only sunshine"]
text_file = sc.parallelize(text)

# map each line in text to a list of words
print('map:',text_file.map(lambda line: line.split(" ")).collect())

# create a single list of words by combining the words from all of the lines
print('flatmap:',text_file.flatMap(lambda line: line.split(" ")).collect())

In [0]:
#Exercise 1
# Write a Spark program that uses the map() transformation followed by the collect() action to compute and output the list of cubes of integer numbers in a given range.

In [0]:
# A sample range of numbers. Experiment with different ones.
r = range(5)

# Create an RDD from r
numsRDD = sc.parallelize(r)

# Use map to conver it to an RDD consisting of a sequence of cubes followed by collect() to materialize the result
# as a list
numsRDD.map(lambda x: x*x*x).collect()


In [0]:
# Exercise 2
# Write a Spark program that uses filter() followed by collect() to output the list of all objects which are positive integers in the given list of objects.

In [0]:
# Sample input list of objects
lst = [1234, '666', 'hi, there!', -23, 0, 'bye', 1]
# Your code goes here
lstRDD = sc.parallelize(lst)
lstRDD.filter(lambda x: type(x) is int and x > 0).collect()

In [0]:
# Exercise 3
# Combine the filter() transformation from the previous exercise with the map() transformation to obtain the list of cubes of all positive numbers found in an input list of objects.

In [0]:
# Sample input list of objects
lst = [1234, '666', 'hi, there!', -23, 0, 'bye', 1]
# Your code goes here
lstRDD.filter(lambda x: type(x) is int and x > 0).map(lambda x : x*x*x).collect()

In [0]:
# Exercise 4
# Given an in-memory collection of words, write a Spark program that computes and outputs the longest one.
# A sample collection. Feel free to modify as you wish.
words=['you', 'are', 'my', 'sunshine', 'my', 'only', 'sunshine','Saikiranlearnspython']
# Your code goes here
sc.parallelize(words).reduce(lambda p, w: p if len(p) > len(w) else w)

In [0]:
# Exercise 5
# Given an in-memory collection of strings, write a Spark program that computes the average of their length. Hint: One possible implementation will first map every word to its length, reduce it to the sum, and then divide it by the length of the list. Note that for larger datasets, it is better to use the count() action to count the number of elements.

In [0]:
# A sample collection. Feel free to modify as you wish.
words=['you', 'are', 'my', 'sunshine', 'my', 'only', 'sunshine']
# Your code goes here
wordsRDD = sc.parallelize(words)
sum = wordsRDD.map(lambda x: len(x)).reduce(lambda acc, y: acc+y)
print(sum)
cnt = wordsRDD.count()
avg = sum / cnt
print('Avg = {0:.1f}'.format(avg))

In [0]:
# Exercise 6
# Modify the above program to only include the words longer than 2 characters. Hint: Use filter transformation.

In [0]:
# A sample collection. Feel free to modify as you wish.
words=['you', 'are', 'my', 'sunshine', 'my', 'only', 'sunshine']
# Your code goes here
lensRDD = sc.parallelize(words).map(lambda x: len(x)).filter(lambda x: x > 2)
sum = lensRDD.reduce(lambda acc, y: acc+y)
cnt = lensRDD.count()
avg = sum / cnt
print('Avg = {0:.1f}'.format(avg))

In [0]:
# Exercise 7
# Repeat Exercise 4 but assume the input is a list of sentences, and not individual words. Hint: Use the flatmap() transformation discussed above to flatten the list of words returned by split(), and then apply reduce() as in Exercise 4. Do the same for Exercises 5 and 6.

In [0]:
import pyspark
# A sample collection. Feel free to modify as you wish.
text=["you are my sunshine","my only sunshine"]
# Your code goes here

wordsRDD = sc.parallelize(text).map(lambda s: s.split())
wordsRDD.flatMap(lambda s: s).reduce(lambda p, w: p if len(p) > len(w) else w)
print(wordsRDD.collect())

# toDebugString is useful to print out the lineage graph for debugging.
print(wordsRDD.toDebugString().decode('utf-8'))

# sometimes useful to print out help info
help(pyspark.RDD.flatMap)

In [0]:
input_data = ["Python Pool",
        "Latracal Solutions",
        "Python pool is best",
        "Basic command in python"]
rdd=spark.sparkContext.parallelize(input_data)
rdd2=rdd.flatMap(lambda x: x.split(" "))
list(rdd2.collect())
#    print(ele)

In [0]:
text=["you are my sunshine","my only sunshine"]
rdd = sc.parallelize(text)
rdd.flatMap(lambda x: x.split())\
     .map(lambda x: len(x))\
     .map(lambda x: (x, 1))\
     .reduceByKey(lambda x, y: x + y)\
     .collect()

In [0]:
text=["you are my sunshine","my only sunshine"]
rdd = sc.parallelize(text)
rdd.flatMap(lambda x: x.split()).map(lambda x: len(x)).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).collect()

In [0]:
#flat file storage 
rdd = sc.textFile("dbfs:/FileStore/shared_uploads/csaikiran482@gmail.com/blogtexts")

In [0]:
rdd.take(5)

In [0]:
def Func(lines):
      lines = lines.lower()
      lines = lines.split()
      return lines
rdd1 = rdd.map(Func)
rdd1.take(5)

In [0]:
#Output is too long so, I have just attached a snippet of it. We can also see that our output is not flat (it’s a nested list). So for getting the flat output, we need to apply a transformation which will flatten the output, The transformation “flatMap” will help here:

# The “flatMap” transformation will return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. This is the main difference between the “flatMap” and map transformations. Let’s apply a “flatMap” transformation on “rdd” , then take the result of this transformation in “rdd2” and print the result after applying this transformation.

rdd2 = rdd.flatMap(Func)
rdd2.take(5)

In [0]:
# Transformation: filter
# Q2: Next, I want to remove the words, which are not necessary to analyze this text. We call these words as “stop words”; Stop words do not add much value in a text. For example, “is”, “am”, “are” and “the” are few examples of stop words.

# Solution: To remove the stop words, we can use a “filter” transformation which will return a new RDD containing only the elements that satisfy given condition(s). Lets apply “filter” transformation on “rdd2” and get words which are not stop words and get the result in “rdd3”. To do that:

#  We need to define the list of stop words in a variable called “stopwords” ( Here, I am selecting only a few words in stop words list instead of all the words).
#  Apply “filter” on “rdd2” (Check if individual words of “rdd2” are in the “stopwords” list or not ).
# We can check first 10 elements of “rdd3” by applying take action.

In [0]:
stopwords = ['is','am','are','the','for','a']
rdd3= rdd2.filter(lambda x: x not in stopwords)
rdd3.take(10)

In [0]:
# Transformation: groupBy
# Q3: After getting the results into rdd3, we want to group the words in rdd3 based on which letters they start with. For example, suppose I want to group each word of rdd3 based on first 3 characters.

# Solution: The “groupBy”  transformation will group the data in the original RDD. It creates a set of key value pairs, where the key is output of a user function, and the value is all items for which the function yields this key.

# We have to pass a function (in this case, I am using a lambda function) inside the “groupBy” which will take the first 3 characters of each word in “rdd3”.
# The key is the first 3 characters and value is all the words which start with these 3 characters.
# After applying “groupBy” function, we store the transformed result in “rdd4” (RDDs are immutable – remember!). To view “rdd4”, we can print first (key, value) elements in “rdd4”.

In [0]:
rdd4 = rdd3.groupBy(lambda w: w[0:3])

In [0]:
print([(k, list(k)) for (k,v) in rdd4.take(10)])

In [0]:
# Transformation: groupBy
# Q3: After getting the results into rdd3, we want to group the words in rdd3 based on which letters they start with. For example, suppose I want to group each word of rdd3 based on first 3 characters.

# Solution: The “groupBy”  transformation will group the data in the original RDD. It creates a set of key value pairs, where the key is output of a user function, and the value is all items for which the function yields this key.

# We have to pass a function (in this case, I am using a lambda function) inside the “groupBy” which will take the first 3 characters of each word in “rdd3”.
# The key is the first 3 characters and value is all the words which start with these 3 characters.
# After applying “groupBy” function, we store the transformed result in “rdd4” (RDDs are immutable – remember!). To view “rdd4”, we can print first (key, value) elements in “rdd4”.

In [0]:
rdd3_mapped = rdd3.map(lambda x: (x,1))
rdd3_grouped = rdd3_mapped.groupByKey()
print(list((j[0], list(j[1])) for j in rdd3_grouped.take(5)))

In [0]:
# Transformation: groupByKey / reduceByKey 
# Q4: What if we want to calculate how many times each word is coming in corpus ?

# Solution: We can apply the “groupByKey” / “reduceByKey” transformations on (key,val) pair RDD. The “groupByKey” will group the values for each key in the original RDD. It will create a new pair, where the original key corresponds to this collected group of values.

# To use “groupbyKey” / “reduceByKey” transformation to find the frequencies of each words, you can follow the steps below:

# A (key,val) pair RDD is required; In this (key,val) pair RDD, key is the word and val is 1 for each word in RDD (1 represents the number for the each word in “rdd3”).
# To apply “groupbyKey” / “reduceByKey” on “rdd3”, we need to first convert “rdd3” to (key,val) pair RDD.
 

# Let’s see, how to convert “rdd3” to new mapped (key,val) RDD. And then we can apply “groupbyKey” / “reduceByKey” transformation on this RDD.

In [0]:
rdd3_mapped = rdd3.map(lambda x: (x,1))
rdd3_grouped = rdd3_mapped.groupByKey()
print(list((j[0], list(j[1])) for j in rdd3_grouped.take(5)))

In [0]:
# After seeing the result of the above code, I rechecked the corpus to know, how many times the word ‘manager’ is there, so I found that ‘manager’ is written more then once. I figure out that there are more words like ‘manager.’ , ‘manager,’ and ”manager:’. Let’s filter ‘manager,’ in “rdd3”.

In [0]:
rdd3.filter(lambda x: x == 'manager,').collect()

In [0]:
# We can see that in above output, we have multiple words with ‘manager’ in our corpus. To overcome this situation we can do several things. We could apply a regular expression to remove unnecessary punctuation from the words. For the purpose of this article, I am skipping that part.

# Until now we have not calculated the frequencies / counts of each words. Let’s proceed further :

In [0]:
rdd3_freq_of_words = rdd3_grouped.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)

In [0]:
# In the above code, I first applied “mapValues” transformation on “rdd3_grouped”. The “mapValues” (only applicable on pair RDD) transformation is like a map (can be applied on any RDD) transform but it has one difference that when we apply map transform on pair RDD we can access the key and value both of this RDD but in case of “mapValues” transformation, it will transform the values by applying some function and key will not be affected. So for example, in above code I applied sum, which will calculate the sum (counts) for the each word.

# After applying “mapValues”  transformation I want to sort the words based on their frequencies so for doing that I am first converting a ( word, frequency ) pair to ( frequency,word ) so that our key and values will be interchanged then, I will apply a sorting based on key and then get a result in “rdd3_freq_of_words”. We can see that 10 most frequent words I used in my previous blog by applying “take” action.

In [0]:
rdd3_freq_of_words.take(10)

In [0]:
# Above output shows that I used words spark 69 times and Apache 52 times in my previous blog.

 

# We can also use “reduceByKey” transformation for counting the frequencies of each word in (key,value) pair RDD. Lets see how will we do this.

In [0]:
rdd3_mapped.reduceByKey(lambda x,y: x+y).map(lambda x:(x[1],x[0])).sortByKey(False).take(10)

In [0]:
# Transformation: mapPartitions
# Q5: How do I perform a task (say count the words ‘spark’ and ‘apache’ in rdd3) separatly on each partition and get the output of the task performed in these partition ?
# Soltion: We can do this by applying “mapPartitions” transformation. The “mapPartitions” is like a map transformation but runs separately on different partitions of a RDD. So, for counting the frequencies of words ‘spark’ and ‘apache’ in each partition of RDD, you can follow the steps:

# Create a function called “func” which will count the frequencies for these words
#  Then, pass the function defined in step1 to the “mapPartitions” transformation.

In [0]:
def func(iterator):
  count_spark = 0
  count_apache = 0
  for i in iterator:
     if i =='spark':
        count_spark = count_spark + 1
     if i == 'apache':
        count_apache = count_apache + 1
  return (count_spark,count_apache)

In [0]:
rdd3.mapPartitions(func).glom().collect()

In [0]:
rdd3.mapPartitions(func).collect()

In [0]:
# Math / Statistical Transformation
# Transformation: sample
# Q6: What if I want to work with samples instead of full data ?
# Soltion: “sample” transformation helps us in taking samples instead of working on full data. The sample method will return a new RDD, containing a statistical sample of the original RDD.
# We can pass the arguments insights as the sample operation:

# “withReplacement = True” or False (to choose the sample with or without replacement)
# “fraction = x” ( x= .4 means we want to choose 40% of data in “rdd” ) and “seed” for reproduce the results.

In [0]:
rdd3_sampled = rdd3.sample(False, 0.4, 42)
print(len(rdd3.collect()),len(rdd3_sampled.collect()))

In [0]:
# Set Theory / Relational Transformation
# Transformation: union
# Q 7: What if I want to create a RDD which contains all the elements (a.k.a. union) of two RDDs ?
# Solution: To do so, we can use “union” transformation on two RDDs. In Spark “union” transformation will return a new RDD by taking the union of two RDDs. Please note that duplicate items will not be removed in the new RDD. To illustrate this:

# I am first going to create a two sample RDD ( say sample1, sample2 ) from the “rdd3” by taking 20% sample for each.
# Apply a union transformation on sample1, sample2.

In [0]:
sample1 = rdd3.sample(False,0.2,42)
sample2 =rdd3.sample(False,0.2,42)
union_of_sample1_sample2 = sample1.union(sample2)
print(len(sample1.collect()), len(sample2.collect()),len(union_of_sample1_sample2.collect()))

In [0]:
# From the above output, we can see that the “sample1”, “sample2” both have 914 elements each. And in the “union_of_sample1_sample2”, we have 1828 elements which shows that union operation didn’t remove the duplicate elements.

In [0]:
# Transformation: join
# Q 8: If we want to join the two pair RDDs based on their key.
# Solution: The “join” transformation can help us join two pairs of RDDs based on their key. To show that:

# First create the two sample (key,value) pair RDDs (“sample1”, “sample2”) from the “rdd3_mapped” same as I did for “union” transformation
#  Apply a “join” transformation on “sample1”,  “sample2”.

In [0]:
sample1 = rdd3_mapped.sample(False,.2,42)
sample2 = rdd3_mapped.sample(False,.2,42)
join_on_sample1_sample2 = sample1.join(sample2)
join_on_sample1_sample2.take(2)

In [0]:
# Transformation: distinct
# Q 9: How to calculate distinct elements in a RDD ?
# Solution: We can apply “distinct” transformation on RDD to get the distinct elements. Let’s see how many distinct words do we have in the “rdd3”.

In [0]:
rdd3_distinct = rdd3.distinct()
len(rdd3_distinct.collect())

In [0]:
# Data Structure / I/O Transformation
# Transformation: coalesce
# Q 10: What if I want to reduce the number of partition of a RDD and get the result in a new RDD?
# Solution: We will use “coalesce” transformation here. To demonstrate that:

# Let’s first check the number of partition in rdd3.

In [0]:
rdd3.getNumPartitions()

In [0]:
# 2. And now apply coalesce transformation on “rdd3” , get the results in “rdd3_coalesce” and see the number of partitions.

In [0]:
rdd3_coalesce = rdd3.coalesce(1)
rdd3_coalesce.getNumPartitions()

In [0]:
# In some previous examples of transformation I already used some of the actions on different RDDs for printing the result. For example,”take” to print the first n elements of a RDD , “getNumPartitions” to know how many partition a RDD has and “collect” to print all elements of RDD.

# Now, I will take few more actions to demonstrate how we can get the results.

In [0]:
# General Actions
# Action: getNumPartitions
# Q 11: How do I find out number of parition in RDD ?

# Solution: With “getNumPartitions”, we can find out that how many partitions exist in our RDD. Let’s see how many partition our initial RDD ("rdd3") has.

In [0]:
rdd3.getNumPartitions()

In [0]:
# Action: Reduce
# Q 12: If I want to find out the sum the all numbers in a RDD.

# Solution: To demonstrate this, I will:

# First create a RDD from a list of number from (1,1000) called “num_rdd”.
# Use a reduce action and pass a function through it (lambda x,y:  x+y).
# A reduce action is use for aggregating all the elements of RDD by applying pairwise user function.

In [0]:
num_rdd = sc.parallelize(range(1,1000))
num_rdd.reduce(lambda x,y: x+y)

In [0]:
# In the code above, I first created a RDD(“num_rdd”) from the list and then I applied a reduce action on it to sum all  the numbers in “num_rdd”.

In [0]:
# Mathematical / Statistical Actions
# Action: count
# Q 13: Count the number of elements in RDD.

# Solution: The count action will count the number of elements in RDD. To see that, let’s apply count action on “rdd3” to count the number of words in "rdd3".

In [0]:
rdd3.count()

In [0]:
# Action: max, min, sum, variance and stdev
# To take the maximum, minimum, sum, variance and standard deviation of a RDD, we can apply “max”, “min”, “sum”, “variance” and “stdev” actions. Let’s take the maximum, minimum, sum, variance and standard deviation of “num_rdd”.

In [0]:
num_rdd.max(),num_rdd.min(), num_rdd.sum(),num_rdd.variance(),num_rdd.stdev()