Installing pyspark 

In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 66kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 40.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=02fb90b628bcf70ad2ed293ef89f5a83ecc012d17d481f8af15261e39f69a1cf
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


Mounting Google Drive

In [2]:
from google.colab import drive

drive.mount("/content/gdrive")

Mounted at /content/gdrive


intializing Spark Session

In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('RDD_Actions').getOrCreate()

Referencing a dataset in an external storage system in Google Drive to create RDD

In [4]:
distFile = spark.sparkContext.textFile("/content/gdrive/My Drive/Colab Notebooks/data/dataset.txt")

**collect()**
Returns the complete dataset as an array. However, when you are dealing with large datasets with millions of data points it is not advisable to use this action because you might run out of memory when executing this action.


In [5]:
print(distFile.collect())

['10\t5\t8\t9\t8', '1\t17\t3\t6\t7', '23\t19\t5\t2\t15', '4\t20\t9\t2\t12', '18\t8\t22\t5\t7']


In [6]:
for element in distFile.collect():
    print(element)

10	5	8	9	8
1	17	3	6	7
23	19	5	2	15
4	20	9	2	12
18	8	22	5	7


let convert this RDD to valid and new RDD in order to do computations

In [11]:
numbers = distFile.flatMap(lambda line: line.split("\t")) 
validNumbers = numbers.map(lambda number: int(number))
print(validNumbers.collect())

[10, 5, 8, 9, 8, 1, 17, 3, 6, 7, 23, 19, 5, 2, 15, 4, 20, 9, 2, 12, 18, 8, 22, 5, 7]


 To use pairRDD in Action let create new RDD with pair of Data

In [12]:
pairs = validNumbers.map(lambda s: (s, 1))
print(pairs.collect())

[(10, 1), (5, 1), (8, 1), (9, 1), (8, 1), (1, 1), (17, 1), (3, 1), (6, 1), (7, 1), (23, 1), (19, 1), (5, 1), (2, 1), (15, 1), (4, 1), (20, 1), (9, 1), (2, 1), (12, 1), (18, 1), (8, 1), (22, 1), (5, 1), (7, 1)]


**count()**
Returns the number of records in the RDD as the name suggests. 


In [13]:
print("Count is "+str(validNumbers.count()))
print("Count is "+str(pairs.count()))

Count is 25
Count is 25


**countApprox**()
This method returns an approximate count of elements in the datasets and returns incomplete when the execution time meets timeout.


In [14]:


print("countApprox is "+str(validNumbers.countApprox(1200)))
print("countApprox is "+str(pairs.countApprox(1200)))


countApprox is 25
countApprox is 25


**countApproxDistint**()
This method also returns an approximate count of elements but takes only the distinct elements into the consideration. If the same element exists twice or more  in the dataset it will only amount to one when counting.


In [15]:
print("countApproxDistinct is "+str(validNumbers.countApproxDistinct()))
print("countApproxDistinct is "+str(pairs.countApproxDistinct()))

countApproxDistinct is 17
countApproxDistinct is 18


**countByValue**() Returns the count of each unique value in the RDD as a dictionary of (value, count) pairs. 

In [16]:
print("countByValue :  "+str(validNumbers.countByValue()))
print("countByValue :  "+str(pairs.countByValue()))

countByValue :  defaultdict(<class 'int'>, {10: 1, 5: 3, 8: 3, 9: 2, 1: 1, 17: 1, 3: 1, 6: 1, 7: 2, 23: 1, 19: 1, 2: 2, 15: 1, 4: 1, 20: 1, 12: 1, 18: 1, 22: 1})
countByValue :  defaultdict(<class 'int'>, {(10, 1): 1, (5, 1): 3, (8, 1): 3, (9, 1): 2, (1, 1): 1, (17, 1): 1, (3, 1): 1, (6, 1): 1, (7, 1): 2, (23, 1): 1, (19, 1): 1, (2, 1): 2, (15, 1): 1, (4, 1): 1, (20, 1): 1, (12, 1): 1, (18, 1): 1, (22, 1): 1})


**first()**
Returns the first element of the dataset.


In [18]:
print("first element :  "+str(validNumbers.first()))
print("first element :  "+str(pairs.first()))

first element :  10
first element :  (10, 1)


**min()**
As the action name suggests, this returns the minimum element in the dataset.

In [19]:
print("min is  "+str(validNumbers.min()))
print("min is  "+str(pairs.min()))

min is  1
min is  (1, 1)


**max()**
Returns the maximum element in the dataset.


In [20]:
print("max is  "+str(validNumbers.max()))
print("max is  "+str(pairs.max()))

max is  23
max is  (23, 1)


**top()**
top(n) returns the top n elements by taking elements in their descending order. 


In [21]:
print("top : "+str(validNumbers.top(3)))
print("top : "+str(pairs.top(3)))

top : [23, 22, 20]
top : [(23, 1), (22, 1), (20, 1)]


**takeOrdered()**
Similar to top() action, takeOrdered(n) returns top n elements but taking the elements in their ascending order. 
It is not advisable to use top() and takeOrdered() action of large datasets because all the data is loaded into the driver's memory when performing these two actions


In [22]:
print("takeOrdered : "+ str(validNumbers.takeOrdered(2)))
print("takeOrdered : "+ str(pairs.takeOrdered(2)))

takeOrdered : [1, 2]
takeOrdered : [(1, 1), (2, 1)]


**reduce()**
Reduces the elements of the dataset through a binary operator

In [23]:
from operator import add
redRes=validNumbers.reduce(add)
print(redRes)

245


**treeReduce()** Reduces the elements of this RDD in a multi-level tree pattern.

In [24]:
add = lambda x, y: x + y
redRes=validNumbers.treeReduce(add)
print(redRes)

245


**aggregate ()**
Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions “combOp” and a neutral “zero value.”


In [25]:
seqOp = (lambda x, y: x + y)
combOp = (lambda x, y: x + y)
agg=validNumbers.aggregate(0, seqOp, combOp)
print(agg)

245
