In [0]:
# http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
# http://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis

# Sample Code
# https://supergloo.com/spark-python/apache-spark-transformations-python-examples
# https://www.analyticsvidhya.com/blog/2016/10/using-pyspark-to-perform-transformations-and-actions-on-rdd/

In [0]:
## map Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.map.html#pyspark.RDD.map

text=["you are my sunshine","my only sunshine"]
text_file = sc.parallelize(text)
# map each line in text to a list of words. 2d array output.
print('map:',text_file.map(lambda line: line.split(" ")).collect())


In [0]:
## filter Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.flatMap.html#pyspark.RDD.flatMap
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.filter.html#pyspark.RDD.filter
## Upload the file. 
## https://docs.databricks.com/data/data.html

## Incase DBFS Upload button is not enabled then Go To Admin Console > Workspace Settings and Enable the DBFS.
## https://docs.databricks.com/administration-guide/admin-console.html
## https://docs.databricks.com/administration-guide/workspace/dbfs-ui-upload.html

names = sc.textFile("dbfs:/FileStore/data/names.csv")
rows = names.map(lambda line: line.split(","))

## Filter rows having name MICHAEL. 
rows.filter(lambda line: "MICHAEL" in line).collect()

In [0]:
##  flatMap Transformation
text=["you are my sunshine","my only sunshine"]
text_file = sc.parallelize(text)
# create a single list of words by combining the words from all of the lines. 1d array output
print('flatmap:',text_file.flatMap(lambda line: line.split(" ")).collect())

In [0]:
## mapPartitions Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapPartitions.html#pyspark.RDD.mapPartitions
one_to_nine = range(1,10)
parallel = sc.parallelize(one_to_nine, 3)
def f(iterator): yield sum(iterator)
  
print(' Parallelism overidden with partitions ' , parallel.getNumPartitions() )  
print(parallel.mapPartitions(f).collect())

#Partition 1: 1+2+3 = 6
#Partition 2: 4+5+6 = 15
#Partition 3: 7+8+9 = 24



parallel = sc.parallelize(one_to_nine)

print(' Default Parallelism with partitions ' , sc.defaultParallelism)
print(' Default Parallelism with partitions ' , parallel.getNumPartitions() )

print(parallel.mapPartitions(f).collect())

## Explanation of the output
#Partition 1 = 1
#Partition 2= 2
#Partition 3 = 3
#Partition 4 = 4
#Partition 5 = 5
#Partition 6 = 6
#Partition 7 = 7
#Partition 8: 8+9 = 17

In [0]:
## mapParititonsWithIndex Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapPartitionsWithIndex.html#pyspark.RDD.mapPartitionsWithIndex
parallel = sc.parallelize(range(1,10),4)
def show(index, iterator): yield 'index: '+str(index)+" values: "+ str(list(iterator))
print(parallel.mapPartitionsWithIndex(show).collect())

## Changing partition number to 3.
parallel = sc.parallelize(range(1,10),3)
print(parallel.mapPartitionsWithIndex(show).collect())

In [0]:
## sample Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sample.html#pyspark.RDD.sample
parallel = sc.parallelize(range(1,10))

## Without seed - Output could be different or same its not guranteed
print(parallel.sample(True,.2).count())
print(parallel.sample(True,.2).count())

## With seed - Output will be always same its guranteed.
print(parallel.sample(True,.2,seed=1).count())
print(parallel.sample(True,.2,seed=1).count())

In [0]:
## union Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.union.html#pyspark.RDD.union

rdd1 = sc.parallelize([1, 1, 2, 3])
rdd2=sc.parallelize(['a','b',1])
print('rdd1=',rdd1.collect())
print('rdd2=',rdd2.collect())
print('union as bags =',rdd1.union(rdd2).collect())

In [0]:
## intersection Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.intersection.html#pyspark.RDD.intersection

rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])

print('rdd1=',rdd1.collect())
print('rdd2=',rdd2.collect())
print('intersection =',rdd1.intersection(rdd2).collect())

In [0]:
## distinct transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.distinct.html#pyspark.RDD.distinct

rdd = sc.parallelize([1,1,2,2,3,4,5,5])
## Get the distinct values
print('Distinct = ' , rdd.distinct().collect())

In [0]:
## groupbyKey Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.groupByKey.html#pyspark.RDD.groupByKey

rdd = sc.parallelize([('a',2), ('b',4), ('b',6)])
print("Original RDD :", rdd.collect())
#print("After transformation : ", rdd.groupByKey().mapValues(lambda x:[a for a in x]).collect())
print("After transformation : ", rdd.groupByKey().mapValues(lambda x:[a+a for a in x]).collect())

print("After transformation : ", rdd.groupByKey().mapValues(len).collect())

In [0]:
## reduceByKey Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduceByKey.html#pyspark.RDD.reduceByKey

rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print("Original RDD :", rdd.collect())
print("After transformation : ", rdd.reduceByKey(lambda a,b: a+b).collect())

from operator import add
rdd2 = sc.parallelize([("a", 4), ("b", 2), ("a", 6)])
print("Original RDD :", rdd2.collect())
print('After transformation ' , rdd2.reduceByKey(add).collect())

In [0]:
## aggregateByKey Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.aggregateByKey.html#pyspark.RDD.aggregateByKey
names = sc.textFile("dbfs:/FileStore/data/names.csv")
print('Original file ', names.collect())

## Skipping the header row.
filtered_rows = names.filter(lambda line: "Count" not in line).map(lambda line: line.split(","))
print('Filtered rows ' , filtered_rows.collect())

## By Name and Adding the Count. 
## Note MICHAEL is repeated twice hence Count has been sum i.e. 25 + 18.
## Index of the data : Year - 0, Name - 1, Country - 2, Gender - 3, Count - 4 
print('After transformation ' , filtered_rows.map(lambda n:  (str(n[1]), int(n[4]) ) ).aggregateByKey(0, lambda k,v: int(v)+k, lambda v,k: k+v).collect())



In [0]:
## sortByKey Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sortByKey.html#pyspark.RDD.sortByKey

list_tupple_pair = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

print(' Transformation sort by first tupple value and print the first element ' , sc.parallelize(list_tupple_pair).sortByKey().first())


print(' Transformation sort by ascending, number partitions = 1 ', sc.parallelize(list_tupple_pair).sortByKey(True, 1).collect())

print(' Transformation sort by ascending, number partitions = 2 ', sc.parallelize(list_tupple_pair).sortByKey(True, 2).collect() )

list_1 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
list_1.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
print(' Sort using custom function ', sc.parallelize(list_1).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect())

In [0]:
## join Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.join.html#pyspark.RDD.join

x_rdd = sc.parallelize([("a", 1), ("b", 4)])
y_rdd = sc.parallelize([("a", 2), ("a", 3)])
print('x_rdd joined with y_rdd ', x_rdd.join(y_rdd).collect())


x_rdd2 = sc.parallelize([("a", 4), ("b", 4)])
y_rdd2 = sc.parallelize([("a", 2), ("a", 5), ("c", 10) ])
print('x_rdd2 joined with y_rdd2 ', x_rdd2.join(y_rdd2).collect())


In [0]:
## 
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.cogroup.html#pyspark.RDD.cogroup

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])

co_group = x.cogroup(y).collect()
print('Co group ' , co_group )

print('Sample output ', [(x, tuple(map(list, y))) for x, y in sorted(list(co_group))])

In [0]:
## cartesian Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.cartesian.html#pyspark.RDD.cartesian

rdd = sc.parallelize([1, 2])
print('Original ', rdd.collect())
print('After Cartesian Transformation with self ', rdd.cartesian(rdd).collect())


rdd = sc.parallelize(['a', 'b'])
print('Original ', rdd.collect())
print('After Cartesian Transformation with self ', rdd.cartesian(rdd).collect())


In [0]:
## pipe Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.pipe.html#pyspark.RDD.pipe
print( ' Pipe transformation using linux command cat ' , sc.parallelize(['1', '2', '', '3']).pipe('cat').collect())

In [0]:
## coalesce Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.coalesce.html#pyspark.RDD.coalesce
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.glom.html#pyspark.RDD.glom

print( ' Original RDD ' , sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect())
print( ' Coalesce RDD ' , sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect())

In [0]:
## repartition Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.repartition.html#pyspark.RDD.repartition
rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
print(' RDD with partition ' , rdd.glom().collect())
print(' Repartition ' , rdd.repartition(2).glom().collect())

In [0]:
## repartitionAndSortWithinPartitions Transformation
## http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.repartitionAndSortWithinPartitions.html#pyspark.RDD.repartitionAndSortWithinPartitions
rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
print('rdd ', rdd.glom().collect())

rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, True)
print('rdd2 ', rdd2.glom().collect())