<a href="https://colab.research.google.com/github/Dhaneshkp/Spark/blob/main/6.5_RDD_Operations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

---
---

<center> <h1> Transformations & Actions on RDDs </h1> </center>

In the notebook, we will implement different transformations and actions using Python.



### `Transformations`


We will do the following transformations in this notebook:

* 1. **Map**
* 2. **FlatMap**
* 3. **Filter**
* 4. **Distinct**
* 5. **Union**
* 6. **Intersection**


### `Actions`


We will do the following Actions in this notebook:

* 1. **Collect**
* 2. **Take**
* 3. **Count**



---


### `IMPORTING THE REQUIRED LIBRARIES`

---

In [1]:
from pyspark.context import SparkContext

In [2]:
sc = SparkContext()
sc

---
---
#### ` Problem Statement`

Suppose there is an orgranization name **`Analytics 20`**. It has 2 different branches, one in **`India`** and another one is in **`Dubai`** We have generated a random data of the employees of this organization. One file **`analytics_20_india.txt`** contains the data of employees of India and another one **`analytics_20_dubai.txt`** that contains the data of employees of Dubai.

Each line of the both the files contains 3 columns. First one is `Name of the employee`, second one is `Department Name` in which he/she works and last one is `Place Name` to which the employee belongs. Data is as shown below -

<center><img src="https://github.com/Dhaneshkp/Spark/blob/main/images/rdd_op_dataset.png?raw=1"></center>

---
---

#### `Reading the file - analytics_20_india.txt`

---

In [3]:
# Read the file
analytics_india = sc.textFile("analytics_20_india.txt")

In [4]:
# Check the type of the file
type(analytics_india)

---
##### Once we read the file in the spark, it has been converted into an RDD.

---
---

#### `Action - collect`

**collect** action will return the complete output.

---

In [5]:
# collect all records
analytics_india.collect()

['Keaton Data_Science India',
 'Idona Data_Science Australia',
 'Janna HR India',
 'Damon Data_Science India',
 'Rahim Marketing India',
 'Audrey Data_Science India',
 'Irma HR Dubai',
 'Tatum HR India',
 'Acton Data_Science India',
 'Ainsley Data_Science India',
 'Phillip Data_Science India',
 'Maite Marketing India',
 'Kevyn Marketing Australia',
 'Vielka HR India',
 'Risa Operations India',
 'Jael Accounts Dubai',
 'Erich Data_Science India',
 'Pearl Operations Australia',
 'Francesca Data_Science India',
 'Ross Sales India',
 'Tarik HR Dubai',
 'Lev HR India',
 'Nerea Accounts India',
 'Halla Sales India',
 'Daquan Legal India',
 'Ivan HR India',
 'Venus HR India',
 'Lareina Legal India',
 'Orlando Sales Australia',
 'Denise Accounts India',
 'Alvin Accounts Dubai',
 'Rafael Data_Science Australia',
 'Whoopi Data_Science Australia',
 'Norman Legal Dubai',
 'Forrest Sales Dubai',
 'Sigourney Legal India',
 'Stone Legal Scotland',
 'Todd Sales India',
 'Jerome Sales India',
 'Signe H

---
---
#### `Action - take`

**take** action will return the top n (takes an integer as a parameter) results of the query. It the similar to the head funciton of pandas.


---

In [6]:
# take 5 records
analytics_india.take(5)

['Keaton Data_Science India',
 'Idona Data_Science Australia',
 'Janna HR India',
 'Damon Data_Science India',
 'Rahim Marketing India']

---
---
#### `Transformation - map`

**map** transformation does the same operation on each of the object.

We will split each line into a list of words using **map**.

---

In [7]:
# Original data
analytics_india.take(5)

['Keaton Data_Science India',
 'Idona Data_Science Australia',
 'Janna HR India',
 'Damon Data_Science India',
 'Rahim Marketing India']

In [8]:
# map transformation - tokenize records
analytics_india_map = analytics_india.map(lambda x: x.split(' '))

In [9]:
# Check top 5 results
analytics_india_map.take(5)

[['Keaton', 'Data_Science', 'India'],
 ['Idona', 'Data_Science', 'Australia'],
 ['Janna', 'HR', 'India'],
 ['Damon', 'Data_Science', 'India'],
 ['Rahim', 'Marketing', 'India']]

---
---
#### `Transformation - distinct`

**distinct** is used to find the unique elements in the RDD.

Find out the list of unique places of origin of the employees in the India branch.

---

In [10]:
# Original map data
analytics_india_map.take(5)

[['Keaton', 'Data_Science', 'India'],
 ['Idona', 'Data_Science', 'Australia'],
 ['Janna', 'HR', 'India'],
 ['Damon', 'Data_Science', 'India'],
 ['Rahim', 'Marketing', 'India']]

In [11]:
# Create rdd with only country value
analytics_india_places = analytics_india_map.map(lambda x: x[2])

In [12]:
# Apply the distinct transformation
analytics_india_distinct_places = analytics_india_places.distinct()

In [13]:
# Distinct country of people working in India branch
analytics_india_distinct_places.collect()

['India', 'Australia', 'Dubai', 'Scotland']

In [14]:
# Use count action to find out the total places
analytics_india_distinct_places.count()

4

---
---

#### `Transformation - filter`

**filter** transformation only returns the elements which satisfies the given condition.

Find out the data of the people who belong to the **India**.


---

In [15]:
# Original map data
analytics_india_map.take(5)

[['Keaton', 'Data_Science', 'India'],
 ['Idona', 'Data_Science', 'Australia'],
 ['Janna', 'HR', 'India'],
 ['Damon', 'Data_Science', 'India'],
 ['Rahim', 'Marketing', 'India']]

In [16]:
# filter the people who belong to the India
analytics_india_employee_india = analytics_india_map.filter(lambda x: x[2] == "India")

In [17]:
# Collect result
analytics_india_employee_india.collect()

[['Keaton', 'Data_Science', 'India'],
 ['Janna', 'HR', 'India'],
 ['Damon', 'Data_Science', 'India'],
 ['Rahim', 'Marketing', 'India'],
 ['Audrey', 'Data_Science', 'India'],
 ['Tatum', 'HR', 'India'],
 ['Acton', 'Data_Science', 'India'],
 ['Ainsley', 'Data_Science', 'India'],
 ['Phillip', 'Data_Science', 'India'],
 ['Maite', 'Marketing', 'India'],
 ['Vielka', 'HR', 'India'],
 ['Risa', 'Operations', 'India'],
 ['Erich', 'Data_Science', 'India'],
 ['Francesca', 'Data_Science', 'India'],
 ['Ross', 'Sales', 'India'],
 ['Lev', 'HR', 'India'],
 ['Nerea', 'Accounts', 'India'],
 ['Halla', 'Sales', 'India'],
 ['Daquan', 'Legal', 'India'],
 ['Ivan', 'HR', 'India'],
 ['Venus', 'HR', 'India'],
 ['Lareina', 'Legal', 'India'],
 ['Denise', 'Accounts', 'India'],
 ['Sigourney', 'Legal', 'India'],
 ['Todd', 'Sales', 'India'],
 ['Jerome', 'Sales', 'India'],
 ['Signe', 'HR', 'India'],
 ['Xavier', 'Legal', 'India'],
 ['Kevin', 'Customer_Support', 'India'],
 ['Michelle', 'Customer_Support', 'India'],
 ['Ign

---

Let's find out the data of the people who belongs to **Dubai** and are in **HR** department.

---

In [18]:
# Apply the filter transformation
analytics_india_filtered_data = analytics_india_map.filter(lambda x: (x[1] == "HR") & (x[2] == "Dubai"))

In [19]:
# collect the results
analytics_india_filtered_data.collect()

[['Irma', 'HR', 'Dubai'], ['Tarik', 'HR', 'Dubai']]

---
---

#### `Transformation - flatmap`

* We saw **map** function does a one-to-one transformation.
> * It transforms each element of a collection into one element of the resulting collection.

<center><img src ="images/map_transformation.png"></center>

* In the **flatmap** transformation, we will see that instead of multiple lists of each line it will return a single list of output.
> * Spark **flatMap** function expresses a one-to-many transformation.

Let's see the difference.

---

In [20]:
# Original data
analytics_india.take(5)

['Keaton Data_Science India',
 'Idona Data_Science Australia',
 'Janna HR India',
 'Damon Data_Science India',
 'Rahim Marketing India']

In [21]:
# flatmap tranformation - tokenize records
analytics_india_flatmap = analytics_india.flatMap(lambda x: x.split(' '))

In [22]:
# flatmap tokenize
analytics_india_flatmap.take(10)

['Keaton',
 'Data_Science',
 'India',
 'Idona',
 'Data_Science',
 'Australia',
 'Janna',
 'HR',
 'India',
 'Damon']

In [23]:
# map tokenize
analytics_india_map.take(10)

[['Keaton', 'Data_Science', 'India'],
 ['Idona', 'Data_Science', 'Australia'],
 ['Janna', 'HR', 'India'],
 ['Damon', 'Data_Science', 'India'],
 ['Rahim', 'Marketing', 'India'],
 ['Audrey', 'Data_Science', 'India'],
 ['Irma', 'HR', 'Dubai'],
 ['Tatum', 'HR', 'India'],
 ['Acton', 'Data_Science', 'India'],
 ['Ainsley', 'Data_Science', 'India']]

---
---

#### `Transformation - union`

Use **union** transformation to find out all the places of origin from both branches - India and Dubai.

---

In [26]:
# Create rdd for Dubai branch file
analytics_dubai = sc.textFile("analytics_20_dubai.txt")

In [27]:
# Dubai data
analytics_dubai.take(5)

['Leo Customer_Support Scotland',
 'Cyrus Customer_Support India',
 'Jolie Sales India',
 'Susan HR Australia',
 'Azalia Customer_Support Dubai']

In [28]:
# map tranformation - tokenize records
analytics_dubai_places = analytics_dubai.map(lambda x: x.split(' '))

In [29]:
# Dubai tokenized records
analytics_dubai_places.take(5)

[['Leo', 'Customer_Support', 'Scotland'],
 ['Cyrus', 'Customer_Support', 'India'],
 ['Jolie', 'Sales', 'India'],
 ['Susan', 'HR', 'Australia'],
 ['Azalia', 'Customer_Support', 'Dubai']]

In [30]:
# Select distinct places from Dubai branch

# Select country from each record
analytics_dubai_places = analytics_dubai_places.map(lambda x: x[2])

# Get distinct places
analytics_dubai_distinct_places = analytics_dubai_places.distinct()

In [31]:
# Unique places from Dubai branch
analytics_dubai_distinct_places.collect()

['Scotland', 'India', 'Australia', 'Dubai', 'South_Africa']

In [32]:
# Unique places from India  branch
analytics_india_distinct_places.collect()

['India', 'Australia', 'Dubai', 'Scotland']

In [33]:
# Union places from two branches
union_places = analytics_india_distinct_places.union(analytics_dubai_distinct_places)

In [34]:
union_places.collect()

['India',
 'Australia',
 'Dubai',
 'Scotland',
 'Scotland',
 'India',
 'Australia',
 'Dubai',
 'South_Africa']

---
---

#### `Transformation - intersection`

Use **intersection** transformation to find out the common places of origin of the employees from both branches - India and Dubai.

---

In [35]:
# Itersection of both RDDs of unique places
common_places = analytics_india_distinct_places.intersection(analytics_dubai_distinct_places)

In [36]:
# Collect the results
common_places.collect()

['Dubai', 'India', 'Australia', 'Scotland']