
# RDD

In [2]:
data=[1,2,3,4,5]
ParelRDD=sc.parallelize(data)

In [3]:
ParelRDD.collect()

[1, 2, 3, 4, 5]

In [4]:
ParelRDD.reduce(lambda a,b:a+b)

15

# Reading text file 

In [5]:
#-----Reading text files----------
fileData = sc.textFile('file:///home/sanjay/SparkTutorial/data/data3.txt')

In [6]:
length=fileData.map(lambda s:len(s))
length.collect()


[7, 8, 5, 6, 7]

# Reading Multiple text Files

In [7]:
#-----Reading Mulltiple files------
fileDatas=sc.wholeTextFiles('file:///home/sanjay/SparkTutorial/data')
fileDatas.collect()

[('file:/home/sanjay/SparkTutorial/data/data.txt', '50\n93\n45\n12\n\n'),
 ('file:/home/sanjay/SparkTutorial/data/data1.txt', '23\n85\n45\n75\n\n'),
 ('file:/home/sanjay/SparkTutorial/data/data3.txt',
  'Hi how \nare you \nfine \nsanjay\nprakash\n')]

# Passing function to Spark

In [8]:
#--------Passing External function to Spark----
data2 = [1,2,3,4,5,6]
ParelRDD2 = sc.parallelize(data2)

In [9]:
def add(i):
    return i+2;
newRDD = ParelRDD2.map(add)
newRDD.collect()

[3, 4, 5, 6, 7, 8]

In [10]:
fileData2 = sc.textFile('file:///home/sanjay/SparkTutorial/data/data3.txt')

In [11]:
def length(s):
    return len(s)
newRDD2 = fileData2.map(length)
newRDD2.collect()

[7, 8, 5, 6, 7]

# Closures and Accumulators

In [12]:
data3 = [1,2,3,4,5]
ParelRDD3 = sc.parallelize(data3)
ParelRDD3.collect()

[1, 2, 3, 4, 5]

In [13]:
#----Closures-------
counter=0
def increment_count(x):
    global counter
    counter+=x
ParelRDD3.foreach(increment_count)
print("Counter :",counter)

Counter : 0


In [14]:
#---Accumulator---------
accum = sc.accumulator(0)
ParelRDD3.foreach(lambda x:accum.add(x))
accum.value

15

# Transformation and Actions

# Transformation

## Map

In [16]:
map_data=['a','b','c','d','e']
ParelRDD4=sc.parallelize(map_data)

In [17]:
new_RDD3=ParelRDD4.map(lambda x: (x,1) )
print(ParelRDD4.collect())
print(new_RDD3.collect())

['a', 'b', 'c', 'd', 'e']
[('a', 1), ('b', 1), ('c', 1), ('d', 1), ('e', 1)]


## Filter

In [18]:
filter_data=[1,2,3,4]
ParelRDD5=sc.parallelize(filter_data)

In [19]:
new_RDD4=ParelRDD5.filter(lambda x: x%2==0)
print(ParelRDD5.collect())
print(new_RDD4.collect())

[1, 2, 3, 4]
[2, 4]


## FlatMap

In [20]:
flatmap_data=[1,2,3,4]
ParelRDD6=sc.parallelize(flatmap_data)

In [23]:
print(ParelRDD6.map(lambda x:[x,x+10,x+20]).collect())
print(ParelRDD6.flatMap(lambda x: [x,x+10,x+20]).collect())

[[1, 11, 21], [2, 12, 22], [3, 13, 23], [4, 14, 24]]
[1, 11, 21, 2, 12, 22, 3, 13, 23, 4, 14, 24]


## GroupBy

In [24]:
groupby_data=['Sanjay','Sanjit','Prakash','Pragatheesh','Vivek']
ParelRDD7=sc.parallelize(groupby_data)

In [34]:
res=ParelRDD7.groupBy(lambda  x: x[0]).collect()
print([(pair[0],[value for value in pair[1]])for pair in res])

[('S', ['Sanjay', 'Sanjit']), ('P', ['Prakash', 'Pragatheesh']), ('V', ['Vivek'])]


## GroupByKey

In [38]:
groupbykey_data=[('A',1),('B',2),('C',3),('A',5),('B',9)]
ParelRDD8=sc.parallelize(groupbykey_data)

In [40]:
res2=ParelRDD8.groupByKey().collect()
print([(p[0],[v for v in p[1]])for p in res2])

[('A', [1, 5]), ('B', [2, 9]), ('C', [3])]


## MapPartitions

In [51]:
ParelRDD9=sc.parallelize([1,2,3,4,5,6],3)

In [52]:
def func(l):
    yield sum(l)

ParelRDD9.mapPartitions(func).collect()
    

[3, 7, 11]

## MapPartitionwithIndex

In [56]:
def func2(i,l):
    yield (i,sum(l))
    
ParelRDD9.mapPartitionsWithIndex(func2).collect()

[(0, 3), (1, 7), (2, 11)]

## ReduceByKey

In [57]:
reducebykey_data=["sanjay","prakash","sanjay","gnana","prakash"]
ParelRDD10=sc.parallelize(reducebykey_data)

In [64]:
x=ParelRDD10.map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
x.collect()

[('prakash', 2), ('sanjay', 2), ('gnana', 1)]

## Sample

In [65]:
ParelRDD11=sc.parallelize([1,2,3,4,5,6])

In [93]:
ParelRDD11.sample(False,0.4).collect()

[1, 3]

## Union

In [94]:
x=sc.parallelize([1,2,3,4])
y=sc.parallelize([5,6])

In [95]:
x.union(y).collect()

[1, 2, 3, 4, 5, 6]

## Join

In [96]:
union_data1=sc.parallelize([("a",2),("b",4),("c",9)])
union_data2=sc.parallelize([("a",5),("b",7)])

In [103]:
union_data1.join(union_data2).collect()

[('a', (2, 5)), ('b', (4, 7))]

## Distinct

In [104]:
sc.parallelize([1,2,3,3,4,4]).distinct().collect()

[4, 1, 2, 3]

## Coalesce

In [108]:
ParelRDD12=sc.parallelize([1,2,3,4,5,6,7,8],4)
y=ParelRDD12.coalesce(2)
print(ParelRDD12.glom().collect())
print(y.glom().collect())

[[1, 2], [3, 4], [5, 6], [7, 8]]
[[1, 2, 3, 4], [5, 6, 7, 8]]


## KeyBy

In [109]:
keyBy_rdd=sc.parallelize(['Sanjay','Prakash','Zaheer','Deepak'])
keyBy_rdd.keyBy(lambda x:x[0]).collect()

[('S', 'Sanjay'), ('P', 'Prakash'), ('Z', 'Zaheer'), ('D', 'Deepak')]

## Zip

In [110]:
a=sc.parallelize([2,3,4])
b=a.map(lambda x: x*x)

In [111]:
a.zip(b).collect()

[(2, 4), (3, 9), (4, 16)]

# Actions

## Aggregate

In [112]:
x=sc.parallelize([1,2,3,4,5,6,7,8,9],3)

In [114]:
x.aggregate(0,lambda x,y:x+y,lambda x,y:x+y)

45

In [117]:
x.aggregate(1,lambda x,y:max(x,y),lambda x,y:x*y)

162

## Take

In [118]:
x.take(5)

[1, 2, 3, 4, 5]

## Reduce

In [119]:
x.reduce(lambda x,y:x+y)

45

## Collect

In [121]:
x

ParallelCollectionRDD[232] at readRDDFromFile at PythonRDD.scala:262

In [120]:
x.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]

## Max,Sum,Mean,Stdev

In [123]:
x.max()

9

In [124]:
x.sum()

45

In [125]:
x.mean()

5.0

In [126]:
x.stdev()

2.581988897471611

## CountByKey

In [129]:
sc.parallelize([('S','Sanjay'),('P','Prakash'),('R','Ranjit'),('S','Sanjit')]).countByKey()

defaultdict(int, {'S': 2, 'P': 1, 'R': 1})