In [88]:
!pip install pyspark



In [89]:
# import library for pyspark
import pyspark

In [90]:
# To create SparkContext
from pyspark import SparkContext

In [91]:
sc = SparkContext()

In [92]:
# Cannot run multiple SparkContexts at once;
## sc1 =SparkContext()

In [93]:
# sc1.stop()

# How to create RDDs

# Methods of RDDs
1. from variable
2. from RDD
3. from External Datasets

In [94]:
#from passing variables use parallelize 
x = [1,2,3,4,5]
y = sc.parallelize(x)

In [95]:
y

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287

In [96]:
y.collect() # Action

[1, 2, 3, 4, 5]

In [97]:
# List object has no attribute 'collect'
## x.collect()

# Transformation Functions

# map()

In [98]:
#It applies to each element of RDD and it returns the result as new RDD. ... 
#Map transforms an RDD of length N into another RDD of length N. 
#The input and output RDDs will typically have the same number of records.

In [99]:
a1 = ["b","a","c"]
x = sc.parallelize(a1)

In [100]:
x

ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:287

In [101]:
y = x.map(lambda z: (z, 1))

In [102]:
y.collect()

                                                                                

[('b', 1), ('a', 1), ('c', 1)]

# flatmap()

In [103]:
#It applies to each element of RDD and it returns the result as new RDD. 
#It is similar to Map, but FlatMap allows returning 0, 1 or more elements from map function.

In [104]:
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, x*100, 42))


In [105]:
y.collect()

[1, 100, 42, 2, 200, 42, 3, 300, 42]

In [106]:
d1 = ["This is a FlatMap operation in PySpark"] 
rdd1 = sc.parallelize(d1)
rdd2 = rdd1.flatMap(lambda x: x.split(" "))


In [107]:
rdd2.collect()

['This', 'is', 'a', 'FlatMap', 'operation', 'in', 'PySpark']

In [108]:
y = rdd2.map(lambda z: (z, 1))

In [109]:
y.collect()

[('This', 1),
 ('is', 1),
 ('a', 1),
 ('FlatMap', 1),
 ('operation', 1),
 ('in', 1),
 ('PySpark', 1)]

In [110]:
rdd2.collect()

['This', 'is', 'a', 'FlatMap', 'operation', 'in', 'PySpark']

# filter()

In [111]:
#It returns an RDD that only has element that pass the condition mentioned in input function.

In [112]:
x = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
y = x.filter(lambda x: x%2 == 1) #keep odd values

In [113]:
y.collect()

[1, 3, 5, 7, 9]

In [114]:
rdd2.collect()

['This', 'is', 'a', 'FlatMap', 'operation', 'in', 'PySpark']

In [115]:
y = rdd2.filter(lambda x: x == 'operation')

In [116]:
y.collect()

['operation']

# distinct()

In [117]:
#It returns a new dataset that contains the distinct elements of the source dataset. 
# It is helpful to remove duplicate data.
# For example, if RDD has elements (Spark, Spark, Hadoop, Flink), 
# then rdd.distinct() will give elements (Spark, Hadoop, Flink).

In [118]:
sc.parallelize(('a','r','a','h','h')).distinct().collect()

['r', 'a', 'h']

In [119]:
# Find max flight in year 2000

# Actions Functions

# count()

In [120]:
#Action count() returns the number of elements in RDD.
sc.parallelize((1, 2, 3, 4)).count()

4

# sum()

In [121]:
# It adds up the value in an RDD.
sc.parallelize((1, 2, 3, 4)).sum()

10

# max()

In [122]:
#Return the maximum value from the dataset.
x = sc.parallelize([2,4,1])


In [123]:
x.max()

4

In [124]:
x.min()

1

# mean()

In [125]:
#Alias for Avg. Returns the average of the values in a column.
x = sc.parallelize([2,4,1])
y = x.mean()

In [126]:
x.mean()

2.3333333333333335

In [127]:
sc.stop()