# pyspark RDD Practice


In [None]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 70kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 48.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=60e1849a8e11c6a77af0f82326ba1d39c9e07ce701ecccbd73445a01557781f0
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [None]:
from pyspark import SparkContext, SparkConf

In [None]:
sc = SparkContext("local")

## Task 1

In [None]:
# Create an RDD from a list of words
RDD = sc.parallelize(["Spark", "is", "a", "framework", "for", "Big Data processing"])

# Print out the type of the created object

#I used here type() for printing the type of object created.
print("The type of RDD is", type(RDD))

The type of RDD is <class 'pyspark.rdd.RDD'>


## Task 2

In [None]:
# Create PairRDD Rdd with key value pairs
# I used here  parallelize() method can be used to create RDD.
Rdd = sc.parallelize([("Ahmed"
, 11), ("Khaled"
, 12),
("Hassan"
, 13), ("Amr"
, 14),("Yassin"
, 15),("Omar"
, 16)])

# Apply reduceByKey() operation on Rdd

 #I used here lambda with x + y with reduceByKey() to combine the values with the same key.
Rdd_Reduced = Rdd.reduceByKey(lambda x, y: x + y)

# Iterate over the result and print the output
for num in Rdd_Reduced.collect(): 
  print("Key {} has {} Counts".format(num[0], num[1]))

Key Ahmed has 11 Counts
Key Khaled has 12 Counts
Key Hassan has 13 Counts
Key Amr has 14 Counts
Key Yassin has 15 Counts
Key Omar has 16 Counts


## Task 3

In [None]:
# Sort the reduced RDD with the key by descending order




    #I used here sortByKey() transformation to sort the RDD using the key.
Rdd_Reduced_Sort = Rdd_Reduced.sortByKey(ascending=False)

# Iterate over the result and print the output
for num in Rdd_Reduced_Sort.collect():
  print("Key {} has {} Counts".format(num[0], num[1]))

Key Yassin has 15 Counts
Key Omar has 16 Counts
Key Khaled has 12 Counts
Key Hassan has 13 Counts
Key Amr has 14 Counts
Key Ahmed has 11 Counts


## Task 4 (CountingBykeys)

In [None]:
# Transform the rdd with countByKey()



   
    
   
 #I used here countByKey() action to count Keys for a pair RDD
total = Rdd.countByKey()


# What is the type of total?
 # here  type of attribute that can work with total to iterate over a dictionary.
print("The type of total is", type(total))

# Iterate over the total and print the output
for k, v in total.items(): 
  print("key", k, "has", v, "counts")

The type of total is <class 'collections.defaultdict'>
key Ahmed has 1 counts
key Khaled has 1 counts
key Hassan has 1 counts
key Amr has 1 counts
key Yassin has 1 counts
key Omar has 1 counts


## Task 5 (Create a base RDD and transform it)

In [None]:
# Create a baseRDD from the file path
from google.colab import files
uploaded = files.upload()

# here i used textFile() to create an base RDD from a file.
baseRDD = sc.textFile("stop_words.txt")

# Split the lines of baseRDD into words

# here i used  flatMap() transformation to split the lines into individual words using
splitRDD = baseRDD.flatMap(lambda x: x.split())

# Count the total number of words
print("Total number of words in splitRDD:", splitRDD.count())


Saving stop_words.txt to stop_words (4).txt
Total number of words in splitRDD: 127


## Task 6 (Remove stop words and reduce the dataset)

In [None]:
# here i used  filter() transformation to filter out the stop words from stop_words.
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)

# Create a tuple of the word and 1 

  # here i used   map() operation replaces each word with a tuple of that word and the number 1.
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))

# Count of the number of occurences of each word 

 # here i used  reduceByKey() counts the number of times a particular word appears in the pair RDD.
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)

## Task 7 (Print word frequencies)

In [None]:
# Display the first 10 words and their frequencies



 
 
 # here i used take(N) to return the N number of words.
for word in resultRDD.take(10):
	print(word)

# Swap the keys and values

# here i used to Swap the keys[0] and values[1] in the resultRDD using map() transformation.
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))

# Sort the keys in descending order

# here i used sortByKey() to sort the counts
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)

# Show the top 10 most frequent words and their frequencies
for word in resultRDD_swap_sort.take(10):
	print("{} has {} counts". format(word[1], word[0]))