<a href="https://colab.research.google.com/github/TenzingJoseph/Big-Data/blob/main/PySpark_1_RDD_T%26A.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **RDD - Transformation and Action Commands**

## **Environment Setup**

In [1]:
!pip install pyspark

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('RDD T&A')
sc = SparkContext.getOrCreate(conf=conf)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## **Creating RDD**

In [2]:
data = ['1 2 3 4', 'Apple Bannana Kiwi Dragonfruit', 'Red Yellow Green Pink', '1 2 3 4']

In [3]:
rdd = sc.parallelize(data)
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

## **Action Commands**
### 1) collect()

In [4]:
rdd.collect()

['1 2 3 4',
 'Apple Bannana Kiwi Dragonfruit',
 'Red Yellow Green Pink',
 '1 2 3 4']

### 2) count()

In [5]:
rdd.count()

4

### 3) take()

In [6]:
rdd.take(3)

['1 2 3 4', 'Apple Bannana Kiwi Dragonfruit', 'Red Yellow Green Pink']

### 4) first()

In [7]:
rdd.first()

'1 2 3 4'

### 5) takesample()

In [8]:
rdd.takeSample(withReplacement=True, num=5)

['Red Yellow Green Pink',
 'Red Yellow Green Pink',
 '1 2 3 4',
 'Red Yellow Green Pink',
 '1 2 3 4']

In [9]:
rdd.takeSample(False, 3)

['Apple Bannana Kiwi Dragonfruit', 'Red Yellow Green Pink', '1 2 3 4']

### 6) takeOrdered()

In [10]:
rdd.takeOrdered(3)

['1 2 3 4', '1 2 3 4', 'Apple Bannana Kiwi Dragonfruit']

### 7) Save a file

In [11]:
# To import text file
# text = sc.textfile('path')

# To save as text file
# rdd.saveAsTextFile('path')

# To save HDFS supporting file
# rdd.saveAsSequenceFile('path)

# To save as Java or Scala supporting file
# rdd.saveAsObjectFile('path')

### 8) countByKey()

In [12]:
rdd.map(lambda x: (1,x)).countByKey()

defaultdict(int, {1: 4})

### 9) countByValue()

In [13]:
rdd.countByValue()

defaultdict(int,
            {'1 2 3 4': 2,
             'Apple Bannana Kiwi Dragonfruit': 1,
             'Red Yellow Green Pink': 1})

## **Transformation Commands**
### 1) map() - maps functions to each elements in the rdd

In [14]:
rdd.map(lambda x: x.split()).collect()

[['1', '2', '3', '4'],
 ['Apple', 'Bannana', 'Kiwi', 'Dragonfruit'],
 ['Red', 'Yellow', 'Green', 'Pink'],
 ['1', '2', '3', '4']]

In [15]:
rdd.map(lambda x: x+' NA').collect()

['1 2 3 4 NA',
 'Apple Bannana Kiwi Dragonfruit NA',
 'Red Yellow Green Pink NA',
 '1 2 3 4 NA']

In [16]:
def split(x):
    return x.split()

rdd.map(split).collect()

[['1', '2', '3', '4'],
 ['Apple', 'Bannana', 'Kiwi', 'Dragonfruit'],
 ['Red', 'Yellow', 'Green', 'Pink'],
 ['1', '2', '3', '4']]

### 2) flatMap() - gives the same output of map but flattens the list

In [17]:
rdd.flatMap(split).collect()

['1',
 '2',
 '3',
 '4',
 'Apple',
 'Bannana',
 'Kiwi',
 'Dragonfruit',
 'Red',
 'Yellow',
 'Green',
 'Pink',
 '1',
 '2',
 '3',
 '4']

### 3) filter() - filter returns the element if the condition is true.

In [18]:
rdd.filter(lambda x: (x == '1 2 3 4') | (x == 'Red Yellow Green Pink')).collect()

['1 2 3 4', 'Red Yellow Green Pink', '1 2 3 4']

In [19]:
def condition(x):
  if x == 'Red Yellow Green Pink':
    return True
  elif x == '1 2 3 4':
    return True
  else:
    return False

rdd.filter(condition).collect()

['1 2 3 4', 'Red Yellow Green Pink', '1 2 3 4']

### 4) distinct() - returns the unique elements

In [20]:
rdd.distinct().collect()

['1 2 3 4', 'Apple Bannana Kiwi Dragonfruit', 'Red Yellow Green Pink']

### 5) groupByKey() 
Note: This will work only if the elements of rdd are in key value pairs in tuple. So we create one first

In [21]:
rdd1 = rdd.map(lambda x: (x,1))
rdd1.collect()

[('1 2 3 4', 1),
 ('Apple Bannana Kiwi Dragonfruit', 1),
 ('Red Yellow Green Pink', 1),
 ('1 2 3 4', 1)]

In [22]:
rdd1.groupByKey().collect()

[('1 2 3 4', <pyspark.resultiterable.ResultIterable at 0x7fd51a435640>),
 ('Apple Bannana Kiwi Dragonfruit',
  <pyspark.resultiterable.ResultIterable at 0x7fd51a3cc8e0>),
 ('Red Yellow Green Pink',
  <pyspark.resultiterable.ResultIterable at 0x7fd51a3cc790>)]

In [23]:
rdd1.groupByKey().mapValues(list).collect()

[('1 2 3 4', [1, 1]),
 ('Apple Bannana Kiwi Dragonfruit', [1]),
 ('Red Yellow Green Pink', [1])]

### 6) reduceByKey()

In [24]:
rdd1.reduceByKey(lambda x,y: x+y).collect()

[('1 2 3 4', 2),
 ('Apple Bannana Kiwi Dragonfruit', 1),
 ('Red Yellow Green Pink', 1)]

In [25]:
rdd1.reduceByKey(lambda x,y: x-y).collect()

[('1 2 3 4', 0),
 ('Apple Bannana Kiwi Dragonfruit', 1),
 ('Red Yellow Green Pink', 1)]

In [26]:
# Word count
rdd.flatMap(lambda x: x.split()).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).collect()

[('1', 2),
 ('4', 2),
 ('Apple', 1),
 ('Bannana', 1),
 ('Kiwi', 1),
 ('Yellow', 1),
 ('Green', 1),
 ('Pink', 1),
 ('2', 2),
 ('3', 2),
 ('Dragonfruit', 1),
 ('Red', 1)]

### 7) getNumPartitions() and repartition()

In [27]:
rdd.getNumPartitions() # Number of parallel splits

2

In [28]:
# Changing the number of parellelization
rdd = rdd.repartition(5)
rdd.getNumPartitions()

5

### 8) coalesce()

In [29]:
# Colat the splits
rdd = rdd.coalesce(3)
rdd.getNumPartitions()

3

### 9) cartesian() - gives all combination pairs

In [30]:
rdd3 = sc.parallelize(range(4))
rdd3.collect()

[0, 1, 2, 3]

In [31]:
rdd3.cartesian(rdd).collect()

[(0, '1 2 3 4'),
 (0, 'Apple Bannana Kiwi Dragonfruit'),
 (1, '1 2 3 4'),
 (1, 'Apple Bannana Kiwi Dragonfruit'),
 (0, 'Red Yellow Green Pink'),
 (0, '1 2 3 4'),
 (1, 'Red Yellow Green Pink'),
 (1, '1 2 3 4'),
 (2, '1 2 3 4'),
 (2, 'Apple Bannana Kiwi Dragonfruit'),
 (3, '1 2 3 4'),
 (3, 'Apple Bannana Kiwi Dragonfruit'),
 (2, 'Red Yellow Green Pink'),
 (2, '1 2 3 4'),
 (3, 'Red Yellow Green Pink'),
 (3, '1 2 3 4')]

### 10) join() - pairs with all pairs of elements for each key

In [32]:
rdd4 = sc.parallelize(range(21)).map(lambda x: (x,1))
rdd4.collect()

[(0, 1),
 (1, 1),
 (2, 1),
 (3, 1),
 (4, 1),
 (5, 1),
 (6, 1),
 (7, 1),
 (8, 1),
 (9, 1),
 (10, 1),
 (11, 1),
 (12, 1),
 (13, 1),
 (14, 1),
 (15, 1),
 (16, 1),
 (17, 1),
 (18, 1),
 (19, 1),
 (20, 1)]

In [33]:
rdd5 = sc.parallelize(range(0,21,2)).map(lambda x: (x,1))
rdd5.collect()

[(0, 1),
 (2, 1),
 (4, 1),
 (6, 1),
 (8, 1),
 (10, 1),
 (12, 1),
 (14, 1),
 (16, 1),
 (18, 1),
 (20, 1)]

In [34]:
rdd6 = rdd4.join(rdd5)
rdd6.collect()

[(0, (1, 1)),
 (4, (1, 1)),
 (8, (1, 1)),
 (12, (1, 1)),
 (16, (1, 1)),
 (20, (1, 1)),
 (2, (1, 1)),
 (6, (1, 1)),
 (10, (1, 1)),
 (14, (1, 1)),
 (18, (1, 1))]

### 11) sortByKey()

In [35]:
rdd6.sortByKey(False).collect()

[(20, (1, 1)),
 (18, (1, 1)),
 (16, (1, 1)),
 (14, (1, 1)),
 (12, (1, 1)),
 (10, (1, 1)),
 (8, (1, 1)),
 (6, (1, 1)),
 (4, (1, 1)),
 (2, (1, 1)),
 (0, (1, 1))]

### 12) union()

In [36]:
rdd7 = sc.parallelize([0, True, 'fire', 'water', 2, 6, 'air'])
rdd8 = sc.parallelize(['fire', 0, 1 ,3, 'light', 'water', True, False])

rdd7.union(rdd8).collect()

[0,
 True,
 'fire',
 'water',
 2,
 6,
 'air',
 'fire',
 0,
 1,
 3,
 'light',
 'water',
 True,
 False]

### 13) intersection()

In [37]:
rdd7.intersection(rdd8).collect()

[0, 'fire', 'water', True]