### Import the required libraries then Create SparkContext

In [8]:
import os

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext

In [2]:
spark

In [9]:
os.makedirs("Dataset", exist_ok=True)

### Create a sample1.txt file to contain the text shown below.

In [11]:
with open("Dataset/sample1.txt", "w") as f:
    f.write('''
Utilitatis causa amicitia est quaesita.
Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Collatio igitur ista tenihil iuvat. 
Honesta oratio, Socratica, Platonis etiam. 
Primum in nostranepotestate est, quid meminerimus? 
Duo Reges: constructio interrete.
Quid, sietiam iucunda memoria est praeteritorum malorum? 
Si quidem, inquit, tollerem,''')

### Read sample1.txt file into RDD and displaying the first 4 elements

In [48]:
rdd = spark.read.text("Dataset/sample1.txt").rdd

In [15]:
rdd.take(4)

[Row(value=''),
 Row(value='Utilitatis causa amicitia est quaesita.'),
 Row(value='Lorem ipsum dolor sit amet, consectetur adipiscing elit. '),
 Row(value='Collatio igitur ista tenihil iuvat. ')]

### Count the total number of rows in RDD

In [18]:
print("Total number of rows :",rdd.count())

Total number of rows : 9


### Create a function to convert the data into lower case and splitting it

In [21]:
rdd_lower = rdd.map(lambda x: x[0].lower())

rdd_lower.take(5)

['',
 'utilitatis causa amicitia est quaesita.',
 'lorem ipsum dolor sit amet, consectetur adipiscing elit. ',
 'collatio igitur ista tenihil iuvat. ',
 'honesta oratio, socratica, platonis etiam. ']

### Remove the stopwords from the previous text. i.e. Remove it.

In [49]:
stopwords = ['a','all','the','as','is','am','an','and',
             'be','been','from','had','I','I’d','why','with']

In [50]:
stopwords = set(word.lower() for word in stopwords)

words_rdd = rdd.flatMap(lambda row: row.value.split()) \
               .filter(lambda word: word.lower() not in stopwords)



In [51]:
print(words_rdd.collect())

['Utilitatis', 'causa', 'amicitia', 'est', 'quaesita.', 'Lorem', 'ipsum', 'dolor', 'sit', 'amet,', 'consectetur', 'adipiscing', 'elit.', 'Collatio', 'igitur', 'ista', 'tenihil', 'iuvat.', 'Honesta', 'oratio,', 'Socratica,', 'Platonis', 'etiam.', 'Primum', 'in', 'nostranepotestate', 'est,', 'quid', 'meminerimus?', 'Duo', 'Reges:', 'constructio', 'interrete.', 'Quid,', 'sietiam', 'iucunda', 'memoria', 'est', 'praeteritorum', 'malorum?', 'Si', 'quidem,', 'inquit,', 'tollerem,']


### Find the words starting with ‘c’

In [52]:
words_starting_with_c = words_rdd.filter(lambda word: word.startswith('c'))
print(words_starting_with_c.collect())

['causa', 'consectetur', 'constructio']


### Reduce the data by key and sum it (use the data from the following list)

In [53]:
list = [('JK', 22), ('V', 24), ('Jimin',24), ('RM', 25)
        , ('J-Hope', 25), ('Suga', 26), ('Jin', 27)
       , ('J-Hope', 12), ('Suga', 25), ('Jin', 34)
       , ('JK', 32), ('V', 44), ('Jimin',14), ('RM', 35)]
# Hint: use reduceByKey

In [54]:
rdd = spark.sparkContext.parallelize(list)

In [55]:
sum_rdd = rdd.reduceByKey(lambda x, y : x + y)

In [56]:
sum_rdd.collect()

[('Jimin', 38),
 ('RM', 60),
 ('V', 68),
 ('Jin', 61),
 ('J-Hope', 37),
 ('Suga', 51),
 ('JK', 54)]

### Creat some key value pairs RDDs

In [57]:
students = [(1, "Kareem"), (2, "Sara"), (3, "Ahmed"), (4, "Mona")]
grades = [(1, "A"), (2, "B"), (4, "A+"), (5, "C")]

In [58]:
students_rdd = spark.sparkContext.parallelize(students)
grades_rdd = spark.sparkContext.parallelize(grades)

print("Students:", students_rdd.collect())
print("Grades:", grades_rdd.collect())

Students: [(1, 'Kareem'), (2, 'Sara'), (3, 'Ahmed'), (4, 'Mona')]
Grades: [(1, 'A'), (2, 'B'), (4, 'A+'), (5, 'C')]


### Perform Join operation on the RDDs (rdd1,rdd2)

In [60]:
rdd_students = spark.sparkContext.parallelize(students)
rdd_grades = spark.sparkContext.parallelize(grades)

In [None]:
joined_rdd = students_rdd.join(grades_rdd)
print(joined_rdd.collect())

[(1, ('Kareem', 'A')), (2, ('Sara', 'B')), (4, ('Mona', 'A+'))]

In [None]:
left_full_grades = rdd_students.leftOuterJoin(rdd_grades)
print(left_full_grades.collect())

[(1, ('Kareem', 'A')),
 (2, ('Sara', 'B')),
 (3, ('Ahmed', None)),
 (4, ('Mona', 'A+'))]

In [None]:
right_full_grades = rdd_students.rightOuterJoin(rdd_grades)
print(right_full_grades.collect())

[(1, ('Kareem', 'A')), (2, ('Sara', 'B')), (4, ('Mona', 'A+')), (5, (None, 'C'))]
