# <center> PySpark Core Programming - RDD Programming </center>

### PySpark Core Program in Interactive Mode using SparkContext

In [2]:
print(sc)
print(type(sc))

<SparkContext master=local[*] appName=PySparkShell>
<class 'pyspark.context.SparkContext'>


In [1]:
rdd1 = sc.textFile("C:\\Cfamily IT\\Data\\comment.txt")

print("No.of Rows of an RDD:",rdd1.count())
print("*"*30)
print("All rows of an RDD:")
print("====================")
for row in rdd1.collect():
    print(row)

print("Few rows of an RDD:")
print("====================")
for row in rdd1.take(2):
    print(row)


No.of Rows of an RDD: 4
******************************
All rows of an RDD:
spark hadoop spark hive
spark spark hive hive
hadoop hdfs hadoop hdfs
hadoop hdfs hive spark
Few rows of an RDD:
spark hadoop spark hive
spark spark hive hive


### PySpark Core Program in In teractive Mode using SparkSession

In [3]:
print(spark)
print(type(spark))

<pyspark.sql.session.SparkSession object at 0x0000022F37CA07C0>
<class 'pyspark.sql.session.SparkSession'>


In [5]:
rdd1 = spark.sparkContext.textFile("C:/Cfamily IT/Data/comment.txt")

print("No.of Rows of an RDD:",rdd1.count())
print("*"*30)
print("All rows of an RDD:")
print("====================")
for row in rdd1.collect():
    print(row)

print("Few rows of an RDD:")
print("====================")
for row in rdd1.take(2):
    print(row)


No.of Rows of an RDD: 4
******************************
All rows of an RDD:
spark hadoop spark hive
spark spark hive hive
hadoop hdfs hadoop hdfs
hadoop hdfs hive spark
Few rows of an RDD:
spark hadoop spark hive
spark spark hive hive


## How to create an RDD?
### 1.Create an RDD by loading file

In [1]:
rdd1 = spark.sparkContext.textFile("C:/Cfamily IT/Data/comment.txt")
print("Count no.of rows of an RDD:",rdd1.count())
print("*"*30)

print("All rows of an RDD:")
print("*"*30)
for row in rdd1.collect():
    print(row)

print("Few rows of an RDD:")
print("*"*30)
for row in rdd1.take(2):
    print(row)

Count no.of rows of an RDD: 4
******************************
All rows of an RDD:
******************************
spark hadoop spark hive
spark spark hive hive
hadoop hdfs hadoop hdfs
hadoop hdfs hive spark
Few rows of an RDD:
******************************
spark hadoop spark hive
spark spark hive hive


### Create an RDD by converting Local Collection

In [2]:
li = [10,20,30,40,50]
rdd1 = spark.sparkContext.parallelize(li)
print("Count no.of rows of an RDD:",rdd1.count())
print("*"*30)

print("All rows of an RDD:")
print("*"*30)
for row in rdd1.collect():
    print(row)
print("*"*30)
print("Few rows of an RDD:")
print("*"*30)
for row in rdd1.take(2):
    print(row)
print("*"*30)

Count no.of rows of an RDD: 5
******************************
All rows of an RDD:
******************************
10
20
30
40
50
******************************
Few rows of an RDD:
******************************
10
20
******************************


### Create an RDD by applying transformation on existing RDD

In [3]:
li = [10,20,30,40,50]
rdd1 = spark.sparkContext.parallelize(li)
print("Before Transformation All rows of an RDD:")
print("*"*30)
for row in rdd1.collect():
    print(row)
print("*"*30)

rdd2 = rdd1.map(lambda x:x+1)
print("After Transformation All rows of an RDD:")
print("*"*30)
for row in rdd2.collect():
    print(row)
print("*"*30)

Before Transformation All rows of an RDD:
******************************
10
20
30
40
50
******************************
After Transformation All rows of an RDD:
******************************
11
21
31
41
51
******************************


### map() Transformation

In [3]:
li = [10,20,30,40,50]
rdd1 = spark.sparkContext.parallelize(li)
print("Before map() Transformation:")
for row in rdd1.collect():
    print(row)
print("*"*30)

rdd2 = rdd1.map(lambda x:x+1)
print("After map() Transformation:")
for row in rdd2.collect():
    print(row)
print("*"*30)

Before map() Transformation:
10
20
30
40
50
******************************
After map() Transformation:
11
21
31
41
51
******************************


### flatMap() Transformation

In [6]:
rdd1 = spark.sparkContext.textFile("C:/Cfamily IT/Data/comment.txt")
print("Before flatMap() Transformation:")
print("*"*30)
for row in rdd1.collect():
    print(row)
print("*"*30)

rdd2 = rdd1.flatMap(lambda x:x.split(" "))
print("After flatMap() Transformation:")
print("*"*30)
for row in rdd2.collect():
    print(row)
print("*"*30)

Before flatMap() Transformation:
******************************
spark hadoop spark hive
spark spark hive hive
hadoop hdfs hadoop hdfs
hadoop hdfs hive spark
******************************
After flatMap() Transformation:
******************************
spark
hadoop
spark
hive
spark
spark
hive
hive
hadoop
hdfs
hadoop
hdfs
hadoop
hdfs
hive
spark
******************************


### filter() Transformation

In [10]:
rdd1 = spark.sparkContext.textFile("C:/Cfamily IT/Data/comment.txt")
print("Before filter() Transformation:")
print("*"*30)
for row in rdd1.collect():
    print(row)
print("*"*30)

rdd2 = rdd1.filter(lambda x:"hadoop" in x)
print("After filter() Transformation:")
print("*"*30)
for row in rdd2.collect():
    print(row)
print("*"*30)

Before filter() Transformation:
******************************
spark hadoop spark hive
spark spark hive hive
hadoop hdfs hadoop hdfs
hadoop hdfs hive spark
******************************
After filter() Transformation:
******************************
spark hadoop spark hive
hadoop hdfs hadoop hdfs
hadoop hdfs hive spark
******************************


### reduceByKey() transformation
### E.g:- Write PySpark RDD Program to find Word Count (Each Word repeated how many times?)

In [3]:
rdd1 = spark.sparkContext.textFile("C:/Cfamily IT/Data/comment.txt")
for row in rdd1.collect():
    print(row)
print("*"*30)
rdd2 = rdd1.flatMap(lambda x:x.split(" "))
for row in rdd2.collect():
    print(row)
print("*"*30)

rdd3 = rdd2.map(lambda x:(x,1))
for row in rdd3.collect():
    print(row)
print("*"*30)

rdd4 = rdd3.reduceByKey(lambda x,y:x+y)
for row in rdd4.collect():
    print(row)
print("*"*30)

spark hadoop spark hive
spark spark hive hive
hadoop hdfs hadoop hdfs
hadoop hdfs hive spark
******************************
spark
hadoop
spark
hive
spark
spark
hive
hive
hadoop
hdfs
hadoop
hdfs
hadoop
hdfs
hive
spark
******************************
('spark', 1)
('hadoop', 1)
('spark', 1)
('hive', 1)
('spark', 1)
('spark', 1)
('hive', 1)
('hive', 1)
('hadoop', 1)
('hdfs', 1)
('hadoop', 1)
('hdfs', 1)
('hadoop', 1)
('hdfs', 1)
('hive', 1)
('spark', 1)
******************************
('hadoop', 4)
('hive', 4)
('hdfs', 3)
('spark', 5)
******************************


In [4]:
wordCountRDD = spark.sparkContext.textFile("C:/Cfamily IT/Data/comment.txt").flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
for row in wordCountRDD.collect():
    print(row)
print("*"*30)

('hadoop', 4)
('hive', 4)
('hdfs', 3)
('spark', 5)
******************************


## Partitions in PySpark

In [1]:
# small file -- Local File
rdd1 = spark.sparkContext.textFile("C:/Cfamily IT/Data/comment.txt")
print("No.of Partitions of an RDD:",rdd1.getNumPartitions())

No.of Partitions of an RDD: 2


In [4]:
# big file -- Local File
rdd2 = spark.sparkContext.textFile("C:/Cfamily IT/Data/bigfile.csv")
print("No.of Partitions of an RDD:",rdd2.getNumPartitions())

No.of Partitions of an RDD: 27


In [5]:
rdd1 = spark.sparkContext.textFile("C:/Cfamily IT/Data/comment.txt",4)
print("No.of Partitions of an RDD:",rdd1.getNumPartitions())

No.of Partitions of an RDD: 5


### repartition()

In [10]:
#default Partitions
rdd1 = spark.sparkContext.textFile("C:/Cfamily IT/Data/comment.txt")
print("Default No.of Partitions of an RDD:",rdd1.getNumPartitions())

#Increase Partitions
rdd2 = rdd1.repartition(10)
print("Increased No.of Partitions of an RDD:",rdd2.getNumPartitions())

#Decrease Partitions
rdd3 = rdd2.repartition(5)
print("Decreased No.of Partitions of an RDD:",rdd3.getNumPartitions())

Default No.of Partitions of an RDD: 2
Increased No.of Partitions of an RDD: 10
Decreased No.of Partitions of an RDD: 5


In [11]:
#default Partitions
rdd1 = spark.sparkContext.textFile("C:/Cfamily IT/Data/comment.txt")
print("Default No.of Partitions of an RDD:",rdd1.getNumPartitions())

#Increase Partitions
rdd2 = rdd1.repartition(10)
print("Increased No.of Partitions of an RDD:",rdd2.getNumPartitions())

#Decrease Partitions
rdd3 = rdd2.coalesce(5)
print("Decreased No.of Partitions of an RDD:",rdd3.getNumPartitions())

Default No.of Partitions of an RDD: 2
Increased No.of Partitions of an RDD: 10
Decreased No.of Partitions of an RDD: 5


In [13]:
#default Partitions
rdd1 = spark.sparkContext.textFile("C:/Cfamily IT/Data/comment.txt")
print("Default No.of Partitions of an RDD:",rdd1.getNumPartitions())

#Increase Partitions
rdd2 = rdd1.coalesce(10)
print("Increased No.of Partitions of an RDD:",rdd2.getNumPartitions())
#Decrease Partitions
rdd3 = rdd2.coalesce(1)
print("Decreased No.of Partitions of an RDD:",rdd3.getNumPartitions())

Default No.of Partitions of an RDD: 2
Increased No.of Partitions of an RDD: 2
Decreased No.of Partitions of an RDD: 1


## <center> Persistence </center>

In [4]:
from pyspark import StorageLevel
## Step1 : Create an RDD
rdd1 = spark.sparkContext.textFile("C:/Cfamily IT/Data/bigfile.csv")

## Verify an RDD Peristed or Not
## got to Spark Web UI and Check

### Upto now there are no RDD's persisted.

## Step3: Persist an RDD
rdd1.persist(StorageLevel.MEMORY_ONLY) # Here we may going to gerror saying "NameError: name 'StorageLevel' is not defined"

### How to resolve -- We  need to import "SorageLevel" class

## Verify an RDD Peristed or Not in Spark Web UI

### Still We are not seeing -- Why
### RDDs are Lazily Evaluated -- Until we perform an Action it will not persist
rdd1.count()



C:/Cfamily IT/Data/bigfile.csv MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
## Step4 : un-persist an RDD
rdd1.unpersist()


C:/Cfamily IT/Data/bigfile.csv MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0

In [6]:
## Step5: Persist an RDD
rdd1.persist(StorageLevel.MEMORY_ONLY_2)
rdd1.count()
## Verify an RDD Peristed or Not in Spark Web UI

10596993

In [7]:
## Step6 : un-persist an RDD
rdd1.unpersist()

C:/Cfamily IT/Data/bigfile.csv MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0

In [8]:
## Step5: Persist an RDD
rdd1.persist(StorageLevel.DISK_ONLY)
rdd1.count()
## Verify an RDD Peristed or Not in Spark Web UI

10596993

In [9]:
## Step8 : un-persist an RDD
rdd1.unpersist()


C:/Cfamily IT/Data/bigfile.csv MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0

## <center> Set Operators or Operations in PySpark </center>

### Union Set Operator

In [8]:
calllogRDD = spark.sparkContext.textFile("C:/Cfamily IT/Data/calllogdata")
print("*"*30)
print("No.of Records of calllogRDD:",calllogRDD.count())
print("No.of Partitions of calllogRDD:",calllogRDD.getNumPartitions())

successRDD = calllogRDD.filter(lambda x:"SUCCESS" in x)
print("No.of Records of successRDD:",successRDD.count())
print("No.of default Partitions of successRDD:",successRDD.getNumPartitions())
successRDD = successRDD.repartition(10)
print("No.of Modified Partitions of successRDD:",successRDD.getNumPartitions())

droppedRDD = calllogRDD.filter(lambda x:"DROPPED" in x)
print("No.of Records of droppedRDD:",droppedRDD.count())
print("No.of Partitions of droppedRDD:",droppedRDD.getNumPartitions())
droppedRDD = droppedRDD.repartition(8)
print("No.of Modified Partitions of droppedRDD:",droppedRDD.getNumPartitions())


failedRDD = calllogRDD.filter(lambda x:"FAILED" in x)
print("No.of Records of failedRDD:",failedRDD.count())
print("No.of Partitions of failedRDD:",failedRDD.getNumPartitions())
failedRDD = failedRDD.repartition(6)
print("No.of Modified Partitions of failedRDD:",failedRDD.getNumPartitions())
print("*"*30)

print("UNION Operation:")
print("*"*30)
unionRDD = calllogRDD.union(successRDD) #206+155 =>361 (records), 2+10=>12 partitions(assume)
print("No.of Records of unionRDD:",unionRDD.count())
print("No.of Partitions of unionRDD:",unionRDD.getNumPartitions())


******************************
No.of Records of calllogRDD: 206
No.of Partitions of calllogRDD: 2
No.of Records of successRDD: 155
No.of default Partitions of successRDD: 2
No.of Modified Partitions of successRDD: 10
No.of Records of droppedRDD: 28
No.of Partitions of droppedRDD: 2
No.of Modified Partitions of droppedRDD: 8
No.of Records of failedRDD: 23
No.of Partitions of failedRDD: 2
No.of Modified Partitions of failedRDD: 6
******************************
UNION Operation:
******************************
No.of Records of unionRDD: 361
No.of Partitions of unionRDD: 12


### Subtraction Operator in PySpark

In [10]:
calllogRDD = spark.sparkContext.textFile("C:/Cfamily IT/Data/calllogdata")
print("*"*30)
print("No.of Records of calllogRDD:",calllogRDD.count())
print("No.of Partitions of calllogRDD:",calllogRDD.getNumPartitions())

successRDD = calllogRDD.filter(lambda x:"SUCCESS" in x)
print("No.of Records of successRDD:",successRDD.count())
print("No.of default Partitions of successRDD:",successRDD.getNumPartitions())
successRDD = successRDD.repartition(10)
print("No.of Modified Partitions of successRDD:",successRDD.getNumPartitions())

droppedRDD = calllogRDD.filter(lambda x:"DROPPED" in x)
print("No.of Records of droppedRDD:",droppedRDD.count())
print("No.of Partitions of droppedRDD:",droppedRDD.getNumPartitions())
droppedRDD = droppedRDD.repartition(8)
print("No.of Modified Partitions of droppedRDD:",droppedRDD.getNumPartitions())


failedRDD = calllogRDD.filter(lambda x:"FAILED" in x)
print("No.of Records of failedRDD:",failedRDD.count())
print("No.of Partitions of failedRDD:",failedRDD.getNumPartitions())
failedRDD = failedRDD.repartition(6)
print("No.of Modified Partitions of failedRDD:",failedRDD.getNumPartitions())
print("*"*30)

print("Subtraction Operation:")
print("*"*30)
subtractionRDD = calllogRDD.subtract(successRDD) #206-155 =>51 (records), 2+10=>12 partitions(assume)
print("No.of Records of subtractionRDD:",subtractionRDD.count())
print("No.of Partitions of subtractionRDD:",subtractionRDD.getNumPartitions())


******************************
No.of Records of calllogRDD: 206
No.of Partitions of calllogRDD: 2
No.of Records of successRDD: 155
No.of default Partitions of successRDD: 2
No.of Modified Partitions of successRDD: 10
No.of Records of droppedRDD: 28
No.of Partitions of droppedRDD: 2
No.of Modified Partitions of droppedRDD: 8
No.of Records of failedRDD: 23
No.of Partitions of failedRDD: 2
No.of Modified Partitions of failedRDD: 6
******************************
Subtraction Operation:
******************************
No.of Records of subtractionRDD: 51
No.of Partitions of subtractionRDD: 12


### Intersection Operator in PySpark

In [11]:
calllogRDD = spark.sparkContext.textFile("C:/Cfamily IT/Data/calllogdata")
print("*"*30)
print("No.of Records of calllogRDD:",calllogRDD.count())
print("No.of Partitions of calllogRDD:",calllogRDD.getNumPartitions())

successRDD = calllogRDD.filter(lambda x:"SUCCESS" in x)
print("No.of Records of successRDD:",successRDD.count())
print("No.of default Partitions of successRDD:",successRDD.getNumPartitions())
successRDD = successRDD.repartition(10)
print("No.of Modified Partitions of successRDD:",successRDD.getNumPartitions())

droppedRDD = calllogRDD.filter(lambda x:"DROPPED" in x)
print("No.of Records of droppedRDD:",droppedRDD.count())
print("No.of Partitions of droppedRDD:",droppedRDD.getNumPartitions())
droppedRDD = droppedRDD.repartition(8)
print("No.of Modified Partitions of droppedRDD:",droppedRDD.getNumPartitions())


failedRDD = calllogRDD.filter(lambda x:"FAILED" in x)
print("No.of Records of failedRDD:",failedRDD.count())
print("No.of Partitions of failedRDD:",failedRDD.getNumPartitions())
failedRDD = failedRDD.repartition(6)
print("No.of Modified Partitions of failedRDD:",failedRDD.getNumPartitions())
print("*"*30)

print("Intersection Operation:")
print("*"*30)
intersectionRDD = calllogRDD.intersection(successRDD) #206-155 =>155 (records), 2+10=>12 partitions(assume)
print("No.of Records of intersectionRDD:",intersectionRDD.count())
print("No.of Partitions of intersectionRDD:",intersectionRDD.getNumPartitions())


******************************
No.of Records of calllogRDD: 206
No.of Partitions of calllogRDD: 2
No.of Records of successRDD: 155
No.of default Partitions of successRDD: 2
No.of Modified Partitions of successRDD: 10
No.of Records of droppedRDD: 28
No.of Partitions of droppedRDD: 2
No.of Modified Partitions of droppedRDD: 8
No.of Records of failedRDD: 23
No.of Partitions of failedRDD: 2
No.of Modified Partitions of failedRDD: 6
******************************
Intersection Operation:
******************************
No.of Records of intersectionRDD: 155
No.of Partitions of intersectionRDD: 12


### Cartesian Operator in PySpark

In [12]:
calllogRDD = spark.sparkContext.textFile("C:/Cfamily IT/Data/calllogdata")
print("*"*30)
print("No.of Records of calllogRDD:",calllogRDD.count())
print("No.of Partitions of calllogRDD:",calllogRDD.getNumPartitions())

successRDD = calllogRDD.filter(lambda x:"SUCCESS" in x)
print("No.of Records of successRDD:",successRDD.count())
print("No.of default Partitions of successRDD:",successRDD.getNumPartitions())
successRDD = successRDD.repartition(10)
print("No.of Modified Partitions of successRDD:",successRDD.getNumPartitions())

droppedRDD = calllogRDD.filter(lambda x:"DROPPED" in x)
print("No.of Records of droppedRDD:",droppedRDD.count())
print("No.of Partitions of droppedRDD:",droppedRDD.getNumPartitions())
droppedRDD = droppedRDD.repartition(8)
print("No.of Modified Partitions of droppedRDD:",droppedRDD.getNumPartitions())


failedRDD = calllogRDD.filter(lambda x:"FAILED" in x)
print("No.of Records of failedRDD:",failedRDD.count())
print("No.of Partitions of failedRDD:",failedRDD.getNumPartitions())
failedRDD = failedRDD.repartition(6)
print("No.of Modified Partitions of failedRDD:",failedRDD.getNumPartitions())
print("*"*30)

print("Cartesian Operation:")
print("*"*30)
cartesianRDD = calllogRDD.cartesian(successRDD) #206-155 =>155 (records), 2*10=>20 partitions(assume)
print("No.of Records of cartesianRDD:",cartesianRDD.count())
print("No.of Partitions of cartesianRDD:",cartesianRDD.getNumPartitions())


******************************
No.of Records of calllogRDD: 206
No.of Partitions of calllogRDD: 2
No.of Records of successRDD: 155
No.of default Partitions of successRDD: 2
No.of Modified Partitions of successRDD: 10
No.of Records of droppedRDD: 28
No.of Partitions of droppedRDD: 2
No.of Modified Partitions of droppedRDD: 8
No.of Records of failedRDD: 23
No.of Partitions of failedRDD: 2
No.of Modified Partitions of failedRDD: 6
******************************
Cartesian Operation:
******************************
No.of Records of cartesianRDD: 31930
No.of Partitions of cartesianRDD: 20


## <center> Aggregation Operators in PySpark </center>
### reduce aggregation operation

In [4]:
li = [1,2,3,4,5,6,7,8,9]
rdd1 = spark.sparkContext.parallelize(li,2)
for row in rdd1.collect():
    print(row)
print("*"*20)

res = rdd1.reduce(lambda x,y:x+y)
print(res)

1
2
3
4
5
6
7
8
9
********************
45


### fold aggregation operation

In [5]:
li = [1,2,3,4,5,6,7,8,9]
rdd1 = spark.sparkContext.parallelize(li,2)
for row in rdd1.collect():
    print(row)
print("*"*20)

res = rdd1.fold(2,lambda x,y:x+y)
print(res)

1
2
3
4
5
6
7
8
9
********************
51


### aggregate aggregation operation

In [6]:
li = [1,2,3,4,5,6,7,8,9]
rdd1 = spark.sparkContext.parallelize(li,2)
for row in rdd1.collect():
    print(row)
print("*"*20)

res = rdd1.aggregate(2,lambda x,y:(x+y),lambda a,b:(a*b))
print(res)

1
2
3
4
5
6
7
8
9
********************
888


## <center> Pair RDD in PySpark </center>

In [2]:
rdd1 = spark.sparkContext.textFile("C:/Cfamily IT/Data/comment.txt")
print("Before Key,Value Pair:")
for row in rdd1.collect():
    print(row)
print("*"*30)

rdd2 = rdd1.flatMap(lambda x:x.split(" "))
rdd3 = rdd2.map(lambda x:(x,1))
print("After Key,Value Pair:")
for row in rdd3.collect():
    print(row)
print("*"*30)

Before Key,Value Pair:
spark hadoop spark hive
spark spark hive hive
hadoop hdfs hadoop hdfs
hadoop hdfs hive spark
******************************
After Key,Value Pair:
('spark', 1)
('hadoop', 1)
('spark', 1)
('hive', 1)
('spark', 1)
('spark', 1)
('hive', 1)
('hive', 1)
('hadoop', 1)
('hdfs', 1)
('hadoop', 1)
('hdfs', 1)
('hadoop', 1)
('hdfs', 1)
('hive', 1)
('spark', 1)
******************************


In [4]:
li = [1,2,3,4,5,6]
rdd1 = spark.sparkContext.parallelize(li)
print("Before Key,Value Pair:")
print(type(rdd1))
for row in rdd1.collect():
    print(row)
print("*"*30)

rdd2 =  rdd1.map(lambda x:(x,10))
print("After Key,Value Pair:")
print(type(rdd1))
for row in rdd2.collect():
    print(row)
print("*"*30)

Before Key,Value Pair:
<class 'pyspark.rdd.RDD'>
1
2
3
4
5
6
******************************
After Key,Value Pair:
<class 'pyspark.rdd.RDD'>
(1, 10)
(2, 10)
(3, 10)
(4, 10)
(5, 10)
(6, 10)
******************************


In [7]:
li = [1,2,3,4,5,6]
rdd1 = spark.sparkContext.parallelize(li)
print("Before Key,Value Pair:")
for row in rdd1.collect():
    print(row)
print("*"*30)

rdd2 =  rdd1.map(lambda x:(x,10))
print("After Key,Value Pair:")
for row in rdd2.collect():
    print(row)
print("*"*30)

print("Only Keys:")
keys = rdd2.keys()
for row in keys.collect():
    print(row)
print("*"*30)

print("Only Values:")
values = rdd2.values()
for row in values.collect():
    print(row)
print("*"*30)

print("Count by Key:",rdd2.countByKey())
print("*"*30)

print("Count by Value:",rdd2.countByValue())
print("*"*30)

Before Key,Value Pair:
1
2
3
4
5
6
******************************
After Key,Value Pair:
(1, 10)
(2, 10)
(3, 10)
(4, 10)
(5, 10)
(6, 10)
******************************
Only Keys:
1
2
3
4
5
6
******************************
Only Values:
10
10
10
10
10
10
******************************
Count by Key: defaultdict(<class 'int'>, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1})
******************************
Count by Value: defaultdict(<class 'int'>, {(1, 10): 1, (2, 10): 1, (3, 10): 1, (4, 10): 1, (5, 10): 1, (6, 10): 1})
******************************
