### Import the required libraries then Create SparkContext

In [1]:
from pyspark.sql import SparkSession

In [6]:
sc = SparkSession.builder.appName('RDD').getOrCreate().sparkContext

### Create and display an RDD from the following list

In [7]:
List = [('JK', 22), ('V', 24), ('Jimin',24), ('RM', 25), ('J-Hope', 25), ('Suga', 26), ('Jin', 27)]

In [9]:
rdd1 = sc.parallelize(List)

In [10]:
rdd1.getNumPartitions()

8

In [12]:
rdd1.collect()

[('JK', 22),
 ('V', 24),
 ('Jimin', 24),
 ('RM', 25),
 ('J-Hope', 25),
 ('Suga', 26),
 ('Jin', 27)]

### Create a sample1.txt file to contain the text shown below.

In [22]:
text = '''Utilitatis causa amicitia est quaesita.
Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Collatio igitur ista tenihil iuvat. 
Honesta oratio, Socratica, Platonis etiam. 
Primum in nostranepotestate est, quid meminerimus? 
Duo Reges: constructio interrete.
Quid, sietiam iucunda memoria est praeteritorum malorum? 
Si quidem, inquit, tollerem,'''

In [23]:
with open("sample1.txt", "w") as file:
    file.write(text)

### Read sample1.txt file into RDD and displaying the first 4 elements

In [24]:
sample1 = sc.textFile("sample1.txt")
sample1

sample1.txt MapPartitionsRDD[9] at textFile at NativeMethodAccessorImpl.java:0

In [25]:
sample1.take(4)

['Utilitatis causa amicitia est quaesita.',
 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. ',
 'Collatio igitur ista tenihil iuvat. ',
 'Honesta oratio, Socratica, Platonis etiam. ']

### Count the total number of rows in RDD

In [26]:
sample1.count()

8

### Create a function to convert the data into lower case and splitting it

In [29]:
def lowersplit(line):
    return line.lower().split()
sample1_lower = sample1.map(lowersplit)
sample1_lower.take(5)

[['utilitatis', 'causa', 'amicitia', 'est', 'quaesita.'],
 ['lorem',
  'ipsum',
  'dolor',
  'sit',
  'amet,',
  'consectetur',
  'adipiscing',
  'elit.'],
 ['collatio', 'igitur', 'ista', 'tenihil', 'iuvat.'],
 ['honesta', 'oratio,', 'socratica,', 'platonis', 'etiam.'],
 ['primum', 'in', 'nostranepotestate', 'est,', 'quid', 'meminerimus?']]

### Remove the stopwords from the previous text. i.e. Remove it.

In [31]:
stopwords = ['a','all','the','as','is','am','an','and',
             'be','been','from','had','I','I’d','why','with']
# Hint: you may need use flatMap

In [49]:
%%timeit
def remove_stopwords(word):
    return word not in stopwords

sample1_no_stopwordsv1 = sample1_lower.flatMap(lambda words: filter(remove_stopwords, words))
sample1_no_stopwordsv1.collect()


66.2 ms ± 1.59 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [50]:
%%timeit
def remove_stopwords(line):
    return [word for word in line if word not in stopwords]

sample1_no_stopwordsv2 = sample1_lower.flatMap(remove_stopwords)
sample1_no_stopwordsv2.collect()

65.8 ms ± 1.31 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [52]:
def remove_stopwords(line):
    return [word for word in line if word not in stopwords]

sample1_no_stopwordsv2 = sample1_lower.flatMap(remove_stopwords)
sample1_no_stopwordsv2.collect()

['utilitatis',
 'causa',
 'amicitia',
 'est',
 'quaesita.',
 'lorem',
 'ipsum',
 'dolor',
 'sit',
 'amet,',
 'consectetur',
 'adipiscing',
 'elit.',
 'collatio',
 'igitur',
 'ista',
 'tenihil',
 'iuvat.',
 'honesta',
 'oratio,',
 'socratica,',
 'platonis',
 'etiam.',
 'primum',
 'in',
 'nostranepotestate',
 'est,',
 'quid',
 'meminerimus?',
 'duo',
 'reges:',
 'constructio',
 'interrete.',
 'quid,',
 'sietiam',
 'iucunda',
 'memoria',
 'est',
 'praeteritorum',
 'malorum?',
 'si',
 'quidem,',
 'inquit,',
 'tollerem,']

### Find the words starting with ‘c’

In [58]:
def startingwith(word, char):
    return word.startswith(char)

words_with_c = sample1_no_stopwordsv2.filter(lambda word: startingwith(word, 'c'))
words_with_c.collect()


['causa', 'consectetur', 'collatio', 'constructio']

### Reduce the data by key and sum it (use the data from the following list)

In [59]:
list = [('JK', 22), ('V', 24), ('Jimin',24), ('RM', 25)
        , ('J-Hope', 25), ('Suga', 26), ('Jin', 27)
       , ('J-Hope', 12), ('Suga', 25), ('Jin', 34)
       , ('JK', 32), ('V', 44), ('Jimin',14), ('RM', 35)]
# Hint: use reduceByKey

In [62]:
rdd2 = sc.parallelize(list)
grouped_rdd = rdd2.reduceByKey(lambda x, y: x + y)
grouped_rdd.collect()

[('JK', 54),
 ('J-Hope', 37),
 ('Suga', 51),
 ('V', 68),
 ('RM', 60),
 ('Jin', 61),
 ('Jimin', 38)]

### Creat some key value pairs RDDs

In [64]:
rdd1 = sc.parallelize([('a',2),('b',3)])
rdd2 = sc.parallelize([('a',9),('b',7),('c',10)])

In [65]:
rdd1.collect()

[('a', 2), ('b', 3)]

In [66]:
rdd2.collect()

[('a', 9), ('b', 7), ('c', 10)]

### Perform Join operation on the RDDs (rdd1,rdd2)

In [67]:
rdd1.join(rdd2).collect()   

[('b', (3, 7)), ('a', (2, 9))]