In [47]:
#!pip install pyspark

In [48]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Demo').master("local").getOrCreate()
sc = spark.sparkContext


In [52]:
from pyspark import SparkContext, SparkConf

sc = SparkContext.getOrCreate(conf=SparkConf().setMaster("local[*]").set("spark.ui.enabled", "true").set("spark.ui.port", "0"))

In [53]:
sc

In [55]:
# Creating RDD
# There are 3 ways for creating RDD in spark
#1.parallelize
#2.external files
#3.from the rdd

# Example of the parallelize method
rdd1 = sc.parallelize([1, 2, 3, 4, 5],3)

In [7]:
# Return a list that contains all of the elements in this RDD
# Note : This method should only be used if the resulting array is expected to be small,
# as all the data is loaded into the driver’s memory.
rdd1.collect()

[1, 2, 3, 4, 5]

In [8]:
# Return the number of elements in the RDD
rdd1.count()

5

In [9]:
# Find the number of partitions
rdd1.getNumPartitions()

3

In [10]:
# Return an RDD created by coalescing all elements within each partition into a list.
rdd1.glom().collect()

[[1], [2, 3], [4, 5]]

In [11]:
# Save the above created RDD as the text file
rdd1.saveAsTextFile('upgrad_folder')

In [12]:
# Creating RDD by reading from the file
# The file we are trying to read should be present in the hdfs path
rddnew = sc.textFile("sample_data/california_housing_test.csv")
rddnew.collect()


['"longitude","latitude","housing_median_age","total_rooms","total_bedrooms","population","households","median_income","median_house_value"',
 '-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000',
 '-118.300000,34.260000,43.000000,1510.000000,310.000000,809.000000,277.000000,3.599000,176500.000000',
 '-117.810000,33.780000,27.000000,3589.000000,507.000000,1484.000000,495.000000,5.793400,270500.000000',
 '-118.360000,33.820000,28.000000,67.000000,15.000000,49.000000,11.000000,6.135900,330000.000000',
 '-119.670000,36.330000,19.000000,1241.000000,244.000000,850.000000,237.000000,2.937500,81700.000000',
 '-119.560000,36.510000,37.000000,1018.000000,213.000000,663.000000,204.000000,1.663500,67000.000000',
 '-121.430000,38.630000,43.000000,1009.000000,225.000000,604.000000,218.000000,1.664100,67000.000000',
 '-120.650000,35.480000,19.000000,2310.000000,471.000000,1341.000000,441.000000,3.225000,166900.000000',
 '-122.840000,38.400000,15.0000

In [13]:
# Counting the number of records in the file
rddnew.count()

3001

In [14]:

type(rddnew)

pyspark.rdd.RDD

In [15]:
# Return a new RDD by applying a function to each element of this RDD.

# this is using the lambda functions (anonymous functions)
rdd = sc.parallelize(["b", "a", "c"])
rdd_upper = rdd.map(lambda x: x.upper())
rdd_upper.collect()


['B', 'A', 'C']

In [16]:
# Map ... using a regular function
def upper_case( v ):
    return v.upper()
rdd = sc.parallelize(["b", "a", "c"])
rdd.map(upper_case).collect()

['B', 'A', 'C']

In [17]:
#Return a new RDD containing only the elements that satisfy a predicate
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.filter(lambda x: x % 2 == 0)
rdd2.collect()

[2, 4]

In [18]:
#Distinct Return a new RDD containing the distinct elements in this RDD.
sc.parallelize([1, 1, 2, 3]).distinct().collect()

[1, 2, 3]

In [19]:
# The result returned above will not be sorted, but in the random order
# If we want to get the data in sorted order in the driver memory
sorted(sc.parallelize([1, 4, 2, 3, 2]).distinct().collect())

[1, 2, 3, 4]

In [20]:
#Union Return the union of this RDD and another one.
rdd = sc.parallelize([1, 1, 2, 3])
rdd_union = rdd.union(rdd)
rdd_union.collect()

[1, 1, 2, 3, 1, 1, 2, 3]

In [21]:
#intersection
# Return the intersection of this RDD and another one.
# The output will not contain any duplicate elements, even if the input RDDs did.

# NOTE : This method performs a shuffle internally
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
sorted(rdd1.intersection(rdd2).collect())

[1, 2, 3]

In [22]:
#subtract
#Return each value in self that is not contained in other
x = sc.parallelize([ 1,2,3,4,5])
y = sc.parallelize([2,3,4])
sorted(x.subtract(y).collect())

[1, 5]

In [23]:
#cartesian
#Return the Cartesian product of this RDD and another one, that is,
# the RDD of all pairs of elements (a, b)
#where a is in self and b is in other.
rdd = sc.parallelize([1, 2])
rdd2 = sc.parallelize([3,4])
sorted(rdd.cartesian(rdd2).collect())

[(1, 3), (1, 4), (2, 3), (2, 4)]

In [24]:
###### Action functions ###########

In [25]:
#Collect
#Return a list that contains all of the elements in this RDD.

# NOTE : This method should only be used if the resulting array is expected to be small,
#as all the data is loaded into the driver’s memory.
rdd = sc.parallelize([1, 2, 3, 4])
rdd.collect()

[1, 2, 3, 4]

In [26]:
#Count
#Return the number of elements in this RDD.
sc.parallelize([2, 3, 4]).count()

3

In [27]:
#countByValue
#Return the count of each unique value in this RDD
#as a dictionary of (value, count) pairs.
sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()

dict_items([(1, 2), (2, 3)])

In [28]:
#take(num)
# Take the first num elements of the RDD.

# It works by first scanning one partition, and use the results from
# that partition to estimate the number of additional partitions needed
# to satisfy the limit.

# Translated from the Scala implementation in RDD#take().


#Note this method should only be used if the resulting array is expected to be small,
#as all the data is loaded into the driver’s memory.

sc.parallelize([2, 3, 4, 5, 6]).take(4)


[2, 3, 4, 5]

In [29]:
#top(num)
#Get the top N elements from an RDD.

#Note This method should only be used if the resulting array is
#expected to be small, as all the data is loaded into the driver’s memory.

#Note It returns the list sorted in descending order.

sc.parallelize([2, 3, 4, 5, 6], 2).top(2)

[6, 5]

In [31]:
#reduce(function)
#Reduces the elements of this RDD using the specified
#commutative and associative binary operator.
#Currently reduces partitions locally.
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).reduce(add)



15

In [36]:
sc.parallelize([9,3,1]).reduce(lambda x,y :x/y)
# We can pass the custom function inside this reduce function.

3.0

In [33]:
#fold
#Aggregate the elements of each partition, and then the results
# for all the partitions, using a given associative function
# and a neutral “zero value.”

# The function op(t1, t2) is allowed to modify t1 and return it as its result
#value to avoid object allocation; however, it should not modify t2.
# from operator import add

sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)



15

In [38]:
sc.parallelize([1, 2, 3, 4, 5]).fold(1, lambda x,y :x*y)

120

In [39]:
#aggregate
# Aggregate the elements of each partition, and then the results for
# all the partitions, using a given combine functions and a neutral
# “zero value.”

# The functions op(t1, t2) is allowed to modify t1 and return it as
# its result value to avoid object allocation; however, it should not modify t2.

# The first function (seqOp) can return a different result type, U,
# than the type of this RDD. Thus, we need one operation for merging
# a T into an U and one operation for merging two U
rdd = sc.parallelize([1, 2, 3, 4])
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
# above step should get us : (1,1),(2,1),(3,1)




In [40]:

combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
rdd.aggregate((0,0), seqOp, combOp)

(10, 4)

In [42]:
#foreach
def f(x): print(x)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(f)




In [None]:
#####   operations on paired rdd

In [43]:
# Creating paired RDDs
# In order to work with paired RDDs it's required to return the RDD which is
# composed of tuple
rdd = sc.parallelize(["b", "a", "c"])
pairedrdd= rdd.map(lambda x: (x,1))
pairedrdd.collect()


[('b', 1), ('a', 1), ('c', 1)]

In [None]:
####### Transformation functions on one paired RDD’s  #######

In [44]:
#reduceByKey()
# Merge the values for each key using an associative and commutative
# reduce function.

# This will also perform the merging locally on each mapper before sending
# results to a reducer, similarly to a “combiner” in MapReduce.

# Output will be partitioned with numPartitions partitions, or the default
# parallelism level if numPartitions is not specified. Default partitioner
# is hash-partition.
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())

[('a', 2), ('b', 1)]

In [45]:
#groupByKey()
# Group the values for each key in the RDD into a single sequence.
# Hash-partitions the resulting RDD with numPartitions partitions.

# Note If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance.

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().collect())

[('a', <pyspark.resultiterable.ResultIterable at 0x7ffb81a65330>),
 ('b', <pyspark.resultiterable.ResultIterable at 0x7ffb81a661d0>)]

In [56]:
#mapValues()
#Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD’s partitioning.

x = sc.parallelize([("a", ["Vishwa", "Mohan", "Rishavv"]), ("b", ["Abhinav"])])
def f(x): return len(x)
x.mapValues(f).collect()

[('a', 3), ('b', 1)]

In [57]:
#flatMapValues()
#Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD’s partitioning.
x = sc.parallelize([("a", ["Vishwa", "Mohan", "Rishavv"]), ("b", ["Abhinav", "Amit"])])
def f(x): return x
x.flatMapValues(f).collect()


[('a', 'Vishwa'),
 ('a', 'Mohan'),
 ('a', 'Rishavv'),
 ('b', 'Abhinav'),
 ('b', 'Amit')]

In [58]:
#keys()
#Return an RDD with the keys of each tuple.
m = sc.parallelize([(1, 2), (3, 4)]).keys()
m.collect()


[1, 3]

In [59]:
#values()
# Return an RDD with the values of each tuple.
m = sc.parallelize([(1, 2), (3, 4)]).values()
m.collect()

[2, 4]

In [60]:
#sortByKeys()
# Sorts this RDD, which is assumed to consist of (key, value) pairs.
tmp = [('Apple', 11), ('Banana', 12), ('Mango', 13), ('Carrot', 14), ('Orange', 15)]
sc.parallelize(tmp).sortByKey(True, 1).collect()


[('Apple', 11), ('Banana', 12), ('Carrot', 14), ('Mango', 13), ('Orange', 15)]

In [61]:
########    Transformation functions on two paired RDD    ##########


In [62]:
#subtractByKey()
#Return each (key, value) pair in self that has no pair with matching key in other.

x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
y = sc.parallelize([("a", 3), ("c", None)])
sorted(x.subtractByKey(y).collect())

[('b', 4), ('b', 5)]

In [63]:
#join()

# Return an RDD containing all pairs of elements with matching keys in self
# and other.

# Each pair of elements will be returned as a (k, (v1, v2)) tuple,
# where (k, v1) is in self and (k, v2) is in other.

# Performs a hash join across the cluster.

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())

[('a', (1, 2)), ('a', (1, 3))]

In [64]:
#rightOuterJoin()

# Perform a right outer join of self and other.

# For each element (k, w) in other, the resulting RDD will either contain
# all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
# if no elements in self have key k.

# Hash-partitions the resulting RDD into the given number of partitions.

rdd1 = sc.parallelize([("a", True), ("b", True)])
rdd2 = sc.parallelize([("a", False)])
sorted(rdd2.rightOuterJoin(rdd1).collect())


[('a', (False, True)), ('b', (None, True))]

In [65]:
#leftOuterJoin()
# Perform a left outer join of self and other.

# For each element (k, v) in self, the resulting RDD will either contain
# all pairs (k, (v, w)) for w in other, or the pair (k, (v, None)) if no
# elements in other have key k.

# Hash-partitions the resulting RDD into the given number of partitions.

rdd1 = sc.parallelize([("a", True), ("b", True)])
rdd2 = sc.parallelize([("a", False)])
sorted(rdd2.leftOuterJoin(rdd1).collect())

[('a', (False, True))]

In [66]:
#cogroup()
# For each key k in self or other, return a resulting RDD that contains a
# tuple with the list of values for that key in self as well as other.

rdd1 = sc.parallelize([("a", True), ("b", True)])
rdd2 = sc.parallelize([("a", False)])

# rdd1.cogroup(rdd2).collect()  Below will be the result of the cogroup
# [('a',
#   (<pyspark.resultiterable.ResultIterable at 0x7f5c5c0dfed0>,
#    <pyspark.resultiterable.ResultIterable at 0x7f5c5c1c1450>)),
#  ('b',
#   (<pyspark.resultiterable.ResultIterable at 0x7f5c5c07bbd0>,
#    <pyspark.resultiterable.ResultIterable at 0x7f5c5c07bc50>))]


[(rdd1, tuple(map(list, rdd2))) for rdd1, rdd2 in sorted(list(rdd1.cogroup(rdd2).collect()))]




[('a', ([True], [False])), ('b', ([True], []))]

In [None]:
#############      Action on paired RDD   ################

In [67]:
#countByKey()
# Count the number of elements for each key, and return the result to
# the master as a dictionary.

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.countByKey().items())

[('a', 2), ('b', 1)]

In [68]:
#lookup(key)
# Return the list of values in the RDD for key key.
# This operation is done efficiently if the RDD has a known partitioner by
# only searching the partition that the key maps to.

l = range(1000)
rdd = sc.parallelize(zip(l, l), 10)
rdd.lookup(42)

[42]