### Import the required libraries then Create SparkContext

In [1]:
import numpy as np
import findspark

findspark.init()


In [2]:
from pyspark import SparkConf, SparkContext

# Create a Spark configuration object
conf = SparkConf().setAppName("MyApp").setMaster("local[*]")

# Get or create SparkContext to avoid multiple instances error
sc = SparkContext.getOrCreate(conf)

### Create and display an RDD from the following list

In [3]:
list = [('JK', 22), ('V', 24), ('Jimin',24), ('RM', 25), ('J-Hope', 25), ('Suga', 26), ('Jin', 27)]

In [4]:
rdd = sc.parallelize(list)

# Display the contents of the RDD
print(rdd.collect())

[('JK', 22), ('V', 24), ('Jimin', 24), ('RM', 25), ('J-Hope', 25), ('Suga', 26), ('Jin', 27)]


### Create a sample1.txt file to contain the text shown below.

In [5]:
text='''
Utilitatis causa amicitia est quaesita.
Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Collatio igitur ista tenihil iuvat. 
Honesta oratio, Socratica, Platonis etiam. 
Primum in nostranepotestate est, quid meminerimus? 
Duo Reges: constructio interrete.
Quid, sietiam iucunda memoria est praeteritorum malorum? 
Si quidem, inquit, tollerem,'''

In [7]:
%%writefile sample1.txt

UsageError: %%writefile is a cell magic, but the cell body is empty.


In [8]:
with open("sample1.txt", "w", encoding="utf-8") as file:
    file.write(text)

print("File 'sample1.txt' created successfully!")

File 'sample1.txt' created successfully!


### Read sample1.txt file into RDD and displaying the first 4 elements

In [9]:
# Read the text file into an RDD
rdd = sc.textFile("sample1.txt")

# Display the first 4 lines
print("\n".join(rdd.take(4)))




Utilitatis causa amicitia est quaesita.
Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Collatio igitur ista tenihil iuvat. 


### Count the total number of rows in RDD

In [10]:
row_count = rdd.count()

# Display the count
print("Total number of rows:", row_count)

Total number of rows: 9


### Create a function to convert the data into lower case and splitting it

In [11]:
# Define the function to convert text to lowercase and split it
def lower_and_split(line):
    return line.lower().split()

# Apply the function to the RDD
processed_rdd = rdd.flatMap(lower_and_split)

# Display the transformed RDD
print(processed_rdd.collect())


['utilitatis', 'causa', 'amicitia', 'est', 'quaesita.', 'lorem', 'ipsum', 'dolor', 'sit', 'amet,', 'consectetur', 'adipiscing', 'elit.', 'collatio', 'igitur', 'ista', 'tenihil', 'iuvat.', 'honesta', 'oratio,', 'socratica,', 'platonis', 'etiam.', 'primum', 'in', 'nostranepotestate', 'est,', 'quid', 'meminerimus?', 'duo', 'reges:', 'constructio', 'interrete.', 'quid,', 'sietiam', 'iucunda', 'memoria', 'est', 'praeteritorum', 'malorum?', 'si', 'quidem,', 'inquit,', 'tollerem,']


### Remove the stopwords from the previous text. i.e. Remove it.

In [12]:
stopwords = ['a','all','the','as','is','am','an','and',
             'be','been','from','had','I','I’d','why','with']
# Hint: you may need use flatMap


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54914)
Traceback (most recent call last):
  File "c:\Users\Shehab\anaconda3\Lib\socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "c:\Users\Shehab\anaconda3\Lib\socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "c:\Users\Shehab\anaconda3\Lib\socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "c:\Users\Shehab\anaconda3\Lib\socketserver.py", line 755, in __init__
    self.handle()
  File "C:\Big Data\spark\python\pyspark\accumulators.py", line 295, in handle
    poll(accum_updates)
  File "C:\Big Data\spark\python\pyspark\accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "C:\Big Data\spark\python\pyspark\accumulators.py", line 271, in accum_update

In [None]:
def process_text(line):
    return [word for word in line.lower().split() if word not in stopwords]

# Apply the function using flatMap to process each line and flatten the result
filtered_rdd = rdd.flatMap(process_text)


### Find the words starting with ‘c’

In [None]:
# Function to check if a word starts with 'c'
def starts_with_c(word):
    return word.startswith('c')

# Filter words that start with 'c'
words_with_c = filtered_rdd.filter(starts_with_c)

# Display the results
print(words_with_c.collect())

### Reduce the data by key and sum it (use the data from the following list)

In [None]:
list = [('JK', 22), ('V', 24), ('Jimin',24), ('RM', 25)
        , ('J-Hope', 25), ('Suga', 26), ('Jin', 27)
       , ('J-Hope', 12), ('Suga', 25), ('Jin', 34)
       , ('JK', 32), ('V', 44), ('Jimin',14), ('RM', 35)]
# Hint: use reduceByKey

In [None]:
# Create an RDD from the list
rdd = sc.parallelize([
    ('JK', 22), ('V', 24), ('Jimin', 24), ('RM', 25),
    ('J-Hope', 25), ('Suga', 26), ('Jin', 27),
    ('J-Hope', 12), ('Suga', 25), ('Jin', 34),
    ('JK', 32), ('V', 44), ('Jimin', 14), ('RM', 35)
])

# Use reduceByKey to sum values for each key
summed_rdd = rdd.reduceByKey(lambda x, y: x + y)

# Display the results
print(summed_rdd.collect())


### Creat some key value pairs RDDs

In [None]:
# Creating an RDD with key-value pairs (name, age)
rdd1 = sc.parallelize([
    ('Alice', 25), ('Bob', 30), ('Charlie', 35),
    ('Alice', 28), ('Bob', 33), ('Charlie', 40)
])

# Creating another RDD with key-value pairs (name, city)
rdd2 = sc.parallelize([
    ('Alice', 'New York'), ('Bob', 'Los Angeles'),
    ('Charlie', 'Chicago'), ('David', 'Houston')
])

# Display the created RDDs
print("RDD1:", rdd1.collect())
print("RDD2:", rdd2.collect())


### Perform Join operation on the RDDs (rdd1,rdd2)

In [None]:
# Perform an inner join on the RDDs based on the key
joined_rdd = rdd1.join(rdd2)

# Display the joined RDD
print(joined_rdd.collect())
