In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=b7623aa38f33114f0ecc0701c1768e6274f6b4633a034be63881ef9366f70843
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark import SparkContext

In [None]:
sc = SparkContext("local", "Tutorial").getOrCreate()

ValueError: ignored

## RDD supports two types of operations namely:


*   **Transformations:** These are the operations which are applied to an RDD to create a new RDD. Transformations follow the principle of Lazy Evaluations (which means that the execution will not start until an action is triggered). This allows you to execute the operations at any time by just calling an action on the data. Few of the transformations provided by RDDs are:

> * map
* flatMap
* filter
* distinct
* reduceByKey
* mapPartitions
* sortBy

*   **Actions:**Actions are the operations which are applied on an RDD to instruct Apache Spark to apply computation and pass the result back to the driver. Few of the actions include:

> * collect
* collectAsMap
* reduce
* countByKey/countByValue
* take
* first





## Creating and displaying an RDD

In [None]:
myRDD = sc.parallelize([('Koustav', 22), ('Swathi', 24), ('Rahul',24), ('Jay', 27), ('Usha', 25), ('Sanjay', 25), ('Somnath', 26), ('Jay', 27), ('Koustav', 28), ('Somnath', 21), ('Jay', 31)])

### The .collect() Action
The **.collect()** action on an RDD returns a list of all the elements of the RDD. It’s a great asset for displaying all the contents of our RDD.

In [None]:
myRDD.collect()

[('Koustav', 22),
 ('Swathi', 24),
 ('Rahul', 24),
 ('Jay', 27),
 ('Usha', 25),
 ('Sanjay', 25),
 ('Somnath', 26),
 ('Jay', 27),
 ('Koustav', 28),
 ('Somnath', 21),
 ('Jay', 31)]

### The .count() Action
The **.count()** action on an RDD is an operation that returns the number of elements of our RDD. This helps in verifying if a correct number of elements are being added in an RDD.

In [None]:
myRDD.count()

11

### The .first() Action
The **.first()** action on an RDD returns the first element from our RDD. This can be helpful when we want to verify if the exact kind of data has been loaded in our RDD as per the requirements.

In [None]:
myRDD.first()

('Koustav', 22)

### The .take() Action
The **.take(n)** action on an RDD returns n number of elements from the RDD. The ‘n’ argument takes an integer which refers to the number of elements we want to extract from the RDD

In [None]:
myRDD.take(3)

[('Koustav', 22), ('Swathi', 24), ('Rahul', 24)]

### The .reduce() Action
The **.reduce()** Actiontakes two elements from the given RDD and operates. This operation is performed using an anonymous function or lambda.

In [None]:
reduce_rdd = sc.parallelize([1,3,4,6])
reduce_rdd.reduce(lambda x, y : x + y)

14

### The .saveAsTextFile() Action
The **.saveAsTextFile()** Action is used to serve the resultant RDD as a text file. We can also specify the path to which file needed to be saved.

In [None]:
myRDD.saveAsTextFile('rdd.txt')

### The countByKey() Action
The **.countByKey()** option is used to count the number of values for each key in the given data. This action returns a dictionary and one can extract the keys and values by iterating over the extracted dictionary using loops. Since we are getting a dictionary as a result, we can also use the dictionary methods such as **.keys()**, **.values()** and **.items()**.

In [None]:
dict_rdd = myRDD.countByKey().items()
for key, value in dict_rdd:
    print(key, value)

Koustav 2
Swathi 1
Rahul 1
Jay 3
Usha 1
Sanjay 1
Somnath 2


### The .map() Transformation
The **.map()** transformation maps a value to the elements of an RDD. The **.map()** transformation takes in an anonymous function and applies this function to each of the elements in the RDD. For example, If we want to add 10 to each of the elements present in RDD, the **.map()** transformation would come in handy. This operation saves time.

In [None]:
my_rdd = sc.parallelize([1,2,3,4])
print(my_rdd.map(lambda x: x+ 10).collect())

[11, 12, 13, 14]


### The .filter() Transformation
A **.filter()** transformation is an operation in PySpark for filtering elements from a PySpark RDD. The **.filter()** transformation takes in an anonymous function with a condition. Again, since it’s a transformation, it returns an RDD having elements that had passed the given condition. For example, we want to return only an even number of elements, we can use the .**filter()** transformation.

In [None]:
filter_rdd = sc.parallelize([2, 3, 4, 5, 6, 7])
filter_rdd.filter(lambda x: x%2 == 0).collect()

[2, 4, 6]

We can also filter strings from a certain text present in an RDD. For example, If we want to check the names of persons from a list of guests starting with a certain alphabet, we can use the .filter() for this operation as well.

In [None]:
filter_rdd_2 = sc.parallelize(['Rahul', 'Swathi', 'Rohan', 'Shreya', 'Priya'])
filter_rdd_2.filter(lambda x: x.startswith('S')).collect()

['Swathi', 'Shreya']

### The .union() Transformation
The **.union()** transformation combines two RDDs and returns the union of the input two RDDs. This can be helpful to extract elements from similar characteristics from two RDDs into a single RDD.

In [None]:
union_inp = sc.parallelize([2,4,5,6,7,8,9,10,11])
union_rdd_1 = union_inp.filter(lambda x: x % 2 == 0)
union_rdd_2 = union_inp.filter(lambda x: x % 3 == 0)
print(union_rdd_1.union(union_rdd_2).collect())

[2, 4, 6, 8, 10, 6, 9]


### The .flatMap() Transformation
The **.flatMap()** transformation peforms same as the **.map()** transformation except the fact that **.flatMap()** transformation return seperate values for each element from original RDD.

In [None]:
flatmap_rdd = sc.parallelize(["Hello Class", "This is PySpark RDD Transformations"])
flatmap_rdd.flatMap(lambda x: x.split(" ")).collect()

['Hello', 'Class', 'This', 'is', 'PySpark', 'RDD', 'Transformations']

### The .reduceByKey() Transformation
The **.reduceByKey()** transformation performs multiple parallel processes for each key in the data and combines the values for the same keys. It uses an anonymous function or lambda to perform the task. Since it’s a transformation, it returns an RDD as a result.

In [None]:
myRDD.reduceByKey(lambda x, y: x + y).collect()

[('Koustav', 50),
 ('Swathi', 24),
 ('Rahul', 24),
 ('Jay', 85),
 ('Usha', 25),
 ('Sanjay', 25),
 ('Somnath', 47)]

### The .sortByKey() Transformation
The **.sortByKey()** transformation sorts the input data by keys from key-value pairs either in ascending or descending order. It returns a unique RDD as a result.

In [None]:
myRDD.sortByKey('ascending').collect()

[('Jay', 27),
 ('Jay', 27),
 ('Jay', 31),
 ('Koustav', 22),
 ('Koustav', 28),
 ('Rahul', 24),
 ('Sanjay', 25),
 ('Somnath', 26),
 ('Somnath', 21),
 ('Swathi', 24),
 ('Usha', 25)]

### The .groupByKey() Transformation
The **.groupByKey()** transformation groups all the values in the given data with the same key together. It returns a new RDD as a result.

In [None]:
dictRDD= myRDD.groupByKey().collect()
for key, value in dictRDD:
    print(key, list(value))

Koustav [22, 28]
Swathi [24]
Rahul [24]
Jay [27, 27, 31]
Usha [25]
Sanjay [25]
Somnath [26, 21]


**Filter and count:** Create an RDD of numbers from 1 to 100 and filter out all the even numbers. Then, count the number of elements in the resulting RDD.

In [None]:
# Create an RDD of numbers from 1 to 100
rdd = sc.parallelize(range(1, 101))

# Filter out even numbers
filtered_rdd = rdd.filter(lambda x: x % 2 != 0)

# Count the number of elements in the resulting RDD
count = filtered_rdd.count()

print("Count of odd numbers:", count)

Count of odd numbers: 50


2. **Word count:** Create an RDD of sentences and count the number of occurrences of each word in the RDD. You can start by splitting the sentences into words and then using the reduceByKey function to count the occurrences of each word.

In [None]:
# Create an RDD of sentences
sentences = sc.parallelize(["Hello world", "Hello PySpark", "PySpark is awesome"])

# Split sentences into words and count their occurrences
word_counts = sentences \
    .flatMap(lambda x: x.split(" ")) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda x, y: x + y)

# Print the word counts
for (word, count) in word_counts.collect():
    print("{}: {}".format(word, count))

Hello: 2
world: 1
PySpark: 2
is: 1
awesome: 1


 **Map and reduce:** Create an RDD of tuples where each tuple contains a number and its square. Then, use the reduce function to calculate the sum of the squares of all the numbers in the RDD.

In [None]:
# Create an RDD of tuples where each tuple contains a number and its square
rdd = sc.parallelize([(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)])

# Calculate the sum of the squares of all the numbers in the RDD using reduce()
sum_of_squares = rdd \
    .map(lambda x: x[1]) \
    .reduce(lambda x, y: x + y)

print("Sum of squares:", sum_of_squares)

Sum of squares: 55


**Group by key:** Create an RDD of tuples where each tuple contains a word and its length. Then, use the groupByKey function to group the tuples by word and calculate the average length of each word.

In [None]:
# Create an RDD of tuples where each tuple contains a word and its length
rdd = sc.parallelize([("Hello", 5), ("world", 5), ("PySpark", 7), ("is", 2), ("awesome", 7)])

# Group the tuples by word and calculate the average length of each word
avg_word_lengths = rdd \
    .groupByKey() \
    .map(lambda x: (x[0], sum(x[1]) / len(x[1])))

# Print the average word lengths
for (word, avg_length) in avg_word_lengths.collect():
    print("{}: {}".format(word, avg_length))

Hello: 5.0
world: 5.0
PySpark: 7.0
is: 2.0
awesome: 7.0


**Join:** Create two RDDs, one containing customer IDs and their names and another containing order IDs and the corresponding customer IDs. Then, use the join function to join the two RDDs and create an RDD of tuples where each tuple contains an order ID, customer name, and customer ID.

In [None]:
# Create RDDs of customer IDs and names, and order IDs and customer IDs
customers_rdd = sc.parallelize([(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David")])
orders_rdd = sc.parallelize([(1, 101), (2, 102), (3, 103), (4, 104)])

# Join the two RDDs on the customer ID
joined_rdd = orders_rdd.join(customers_rdd)

In [None]:
joined_rdd.collect()

[(2, (102, 'Bob')),
 (4, (104, 'David')),
 (1, (101, 'Alice')),
 (3, (103, 'Charlie'))]

In [None]:

# Map the joined RDD to tuples of order ID, customer name, and customer ID
result_rdd = joined_rdd.map(lambda x: (x[1][0], x[1][1], x[0]))

# Collect the result RDD to the driver program and print it using a for loop
result_rdd.collect()

[(102, 'Bob', 2), (104, 'David', 4), (101, 'Alice', 1), (103, 'Charlie', 3)]

### Reading data from a text file and displaying the first 1 elements

In [None]:
New_RDD = sc.textFile("/content/sample.txt")
New_RDD.take(1)

['Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce eros libero, aliquam sit amet dolor aliquam, vestibulum feugiat eros. Morbi non elit dolor. Aenean dictum venenatis justo. Cras imperdiet mollis mauris, at dignissim eros pharetra at. Aenean semper diam non nisi efficitur, vitae vulputate nisl eleifend. Aenean sed maximus odio. Sed semper, justo eu eleifend euismod, sapien nibh venenatis purus, et sagittis nulla lacus id metus. Praesent efficitur diam et varius egestas. Donec in pellentesque elit, quis blandit augue. Morbi tristique sapien eu urna gravida, eu mattis felis laoreet. Etiam venenatis metus justo, vestibulum tincidunt magna tristique et.']

### Creating a function to convert the data into lower case and splitting it


In [None]:
RDD_1 = New_RDD.map(lambda x: x.lower().split())
RDD_1.collect()

[['lorem',
  'ipsum',
  'dolor',
  'sit',
  'amet,',
  'consectetur',
  'adipiscing',
  'elit.',
  'fusce',
  'eros',
  'libero,',
  'aliquam',
  'sit',
  'amet',
  'dolor',
  'aliquam,',
  'vestibulum',
  'feugiat',
  'eros.',
  'morbi',
  'non',
  'elit',
  'dolor.',
  'aenean',
  'dictum',
  'venenatis',
  'justo.',
  'cras',
  'imperdiet',
  'mollis',
  'mauris,',
  'at',
  'dignissim',
  'eros',
  'pharetra',
  'at.',
  'aenean',
  'semper',
  'diam',
  'non',
  'nisi',
  'efficitur,',
  'vitae',
  'vulputate',
  'nisl',
  'eleifend.',
  'aenean',
  'sed',
  'maximus',
  'odio.',
  'sed',
  'semper,',
  'justo',
  'eu',
  'eleifend',
  'euismod,',
  'sapien',
  'nibh',
  'venenatis',
  'purus,',
  'et',
  'sagittis',
  'nulla',
  'lacus',
  'id',
  'metus.',
  'praesent',
  'efficitur',
  'diam',
  'et',
  'varius',
  'egestas.',
  'donec',
  'in',
  'pellentesque',
  'elit,',
  'quis',
  'blandit',
  'augue.',
  'morbi',
  'tristique',
  'sapien',
  'eu',
  'urna',
  'gravida,',


### Filter the stopwords from the previous text

In [None]:
stopwords = ['a','all','the','as','is','am','an','and',
             'be','been','from','had','I','I’d','why','with']

In [None]:
RDD_2 = New_RDD.flatMap(lambda x: x.lower().split())\
                  .filter(lambda x: x not in stopwords)
RDD_2.collect()

['lorem',
 'ipsum',
 'dolor',
 'sit',
 'amet,',
 'consectetur',
 'adipiscing',
 'elit.',
 'fusce',
 'eros',
 'libero,',
 'aliquam',
 'sit',
 'amet',
 'dolor',
 'aliquam,',
 'vestibulum',
 'feugiat',
 'eros.',
 'morbi',
 'non',
 'elit',
 'dolor.',
 'aenean',
 'dictum',
 'venenatis',
 'justo.',
 'cras',
 'imperdiet',
 'mollis',
 'mauris,',
 'at',
 'dignissim',
 'eros',
 'pharetra',
 'at.',
 'aenean',
 'semper',
 'diam',
 'non',
 'nisi',
 'efficitur,',
 'vitae',
 'vulputate',
 'nisl',
 'eleifend.',
 'aenean',
 'sed',
 'maximus',
 'odio.',
 'sed',
 'semper,',
 'justo',
 'eu',
 'eleifend',
 'euismod,',
 'sapien',
 'nibh',
 'venenatis',
 'purus,',
 'et',
 'sagittis',
 'nulla',
 'lacus',
 'id',
 'metus.',
 'praesent',
 'efficitur',
 'diam',
 'et',
 'varius',
 'egestas.',
 'donec',
 'in',
 'pellentesque',
 'elit,',
 'quis',
 'blandit',
 'augue.',
 'morbi',
 'tristique',
 'sapien',
 'eu',
 'urna',
 'gravida,',
 'eu',
 'mattis',
 'felis',
 'laoreet.',
 'etiam',
 'venenatis',
 'metus',
 'justo,',

### Filtering the words starting with ‘c’


In [None]:
RDD_3 = RDD_2.filter(lambda x: x.startswith('c'))
RDD_3.collect()

['consectetur',
 'cras',
 'cras',
 'consequat',
 'cursus',
 'congue',
 'consectetur',
 'consequat',
 'consequat',
 'commodo',
 'curabitur',
 'cursus',
 'cras',
 'commodo',
 'curabitur',
 'commodo',
 'condimentum',
 'commodo',
 'cras',
 'consectetur',
 'consectetur',
 'cursus,',
 'commodo',
 'commodo.',
 'curabitur',
 'cursus']

### Grouping the data by key and then sorting it


In [None]:
rdd_mapped = RDD_2.map(lambda x: (x,1))
rdd_grouped = rdd_mapped.groupByKey()
rdd_frequency = rdd_grouped.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)
rdd_frequency.collect()

[(15, 'eget'),
 (14, 'sed'),
 (14, 'in'),
 (12, 'et'),
 (11, 'eu'),
 (11, 'ac'),
 (10, 'non'),
 (9, 'ipsum'),
 (9, 'at'),
 (9, 'nulla'),
 (9, 'id'),
 (9, 'quis'),
 (9, 'tincidunt'),
 (8, 'sit'),
 (8, 'aliquam'),
 (8, 'vitae'),
 (8, 'lacus'),
 (7, 'aenean'),
 (7, 'maximus'),
 (7, 'sapien'),
 (7, 'donec'),
 (7, 'tristique'),
 (7, 'erat'),
 (7, 'ut'),
 (6, 'lorem'),
 (6, 'vestibulum'),
 (6, 'feugiat'),
 (6, 'morbi'),
 (6, 'dictum'),
 (6, 'venenatis'),
 (6, 'diam'),
 (6, 'eleifend'),
 (6, 'orci'),
 (6, 'mauris'),
 (5, 'dolor'),
 (5, 'fusce'),
 (5, 'elit'),
 (5, 'urna'),
 (5, 'magna'),
 (5, 'lacinia'),
 (5, 'auctor'),
 (5, 'vivamus'),
 (5, 'nec'),
 (5, 'augue'),
 (5, 'ultrices'),
 (5, 'commodo'),
 (5, 'maecenas'),
 (4, 'consectetur'),
 (4, 'eros'),
 (4, 'amet'),
 (4, 'cras'),
 (4, 'nisl'),
 (4, 'justo'),
 (4, 'nibh'),
 (4, 'efficitur'),
 (4, 'varius'),
 (4, 'pellentesque'),
 (4, 'felis'),
 (4, 'sollicitudin'),
 (4, 'quisque'),
 (4, 'leo'),
 (4, 'volutpat'),
 (4, 'sagittis.'),
 (4, 'rutrum')