<a href="https://colab.research.google.com/github/Mery57/Data_Projects/blob/main/PySpark_RDD_groupByKey.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=b1daaa3448a32b271eebb4efe6b35a3325a89fc29da0da6366fd55e1faf374d4
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from __future__ import print_function
import sys
from pyspark.sql import SparkSession

#-----------------------------------------------------
# Apply a groupByKey() transformation to an
# RDD[key, value] to find average per key.
# Input: NONE
#------------------------------------------------------
# Input Parameters:
#    NONE
#-------------------------------------------------------
# @author Mahmoud Parsian
#-------------------------------------------------------

#=========================================
def create_pair(t3):
    # t3 = (name, city, number)
    name = t3[0]
    #city = t3[1]
    number = int(t3[2])
    # return (k, v) pair
    return (name, number)
#end-def
#==========================================
def main():

    # create an instance of SparkSession
    spark = SparkSession.builder.getOrCreate()
    #

    #========================================
    # groupByKey() transformation
    #
    # source_rdd.groupByKey() --> target_rdd
    #
    # Group the values for each key in the RDD
    # into a single sequence. Hash-partitions the
    # resulting RDD with the existing partitioner/
    # parallelism level.
    #
    # Note: If you are grouping in order to perform
    # an aggregation (such as a sum or average) over
    # each key, using reduceByKey() or combineByKey()
    # will provide much better performance.
    #========================================

    # Create a list of tuples.
    # Each tuple contains name, city, and age.
    # Create a RDD from the list above.
    list_of_tuples= [('alex','Sunnyvale', 25), \
                     ('alex','Sunnyvale', 33), \
                     ('alex','Sunnyvale', 45), \
                     ('alex','Sunnyvale', 63), \
                     ('mary', 'Ames', 22), \
                     ('mary', 'Cupertino', 66), \
                     ('mary', 'Ames', 20), \
                     ('bob', 'Ames', 26)]
    print("list_of_tuples = ", list_of_tuples)
    rdd = spark.sparkContext.parallelize(list_of_tuples)
    print("rdd = ", rdd)
    print("rdd.count() = ", rdd.count())
    print("rdd.collect() = ", rdd.collect())

    #------------------------------------
    # apply a map() transformation to rdd
    # create a (key, value) pair
    #  where
    #       key is the name (first element of tuple)
    #       value is a number
    #------------------------------------
    rdd2 = rdd.map(lambda t : create_pair(t))
    print("rdd2 = ", rdd2)
    print("rdd2.count() = ", rdd2.count())
    print("rdd2.collect() = ", rdd2.collect())


    #------------------------------------
    # apply a groupByKey() transformation to rdd2
    # to create a (key, value) pairs (as rdd3)
    #  where
    #       key is the name
    #       value is the Iterable<number>
    #------------------------------------
    rdd3 = rdd2.groupByKey()
    print("rdd3 = ", rdd3)
    print("rdd3.count() = ", rdd3.count())
    print("rdd3.collect() = ", rdd3.collect())
    print("rdd3.mapValues().collect() = ", rdd3.mapValues(lambda values: list(values)).collect())

    # find average per key
    averages = rdd3.mapValues(lambda numbers: float(sum(numbers)) / float(len(numbers)))
    print("averages = ", averages)
    print("averages.count() = ", averages.count())
    print("averages.collect() = ", averages.collect())

    # done!
    spark.stop()
#end-def
#=========================================
if __name__ == '__main__':
    main()

list_of_tuples =  [('alex', 'Sunnyvale', 25), ('alex', 'Sunnyvale', 33), ('alex', 'Sunnyvale', 45), ('alex', 'Sunnyvale', 63), ('mary', 'Ames', 22), ('mary', 'Cupertino', 66), ('mary', 'Ames', 20), ('bob', 'Ames', 26)]
rdd =  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
rdd.count() =  8
rdd.collect() =  [('alex', 'Sunnyvale', 25), ('alex', 'Sunnyvale', 33), ('alex', 'Sunnyvale', 45), ('alex', 'Sunnyvale', 63), ('mary', 'Ames', 22), ('mary', 'Cupertino', 66), ('mary', 'Ames', 20), ('bob', 'Ames', 26)]
rdd2 =  PythonRDD[2] at RDD at PythonRDD.scala:53
rdd2.count() =  8
rdd2.collect() =  [('alex', 25), ('alex', 33), ('alex', 45), ('alex', 63), ('mary', 22), ('mary', 66), ('mary', 20), ('bob', 26)]
rdd3 =  PythonRDD[8] at RDD at PythonRDD.scala:53
rdd3.count() =  3
rdd3.collect() =  [('alex', <pyspark.resultiterable.ResultIterable object at 0x7c60828d0fd0>), ('mary', <pyspark.resultiterable.ResultIterable object at 0x7c60828d0f40>), ('bob', <pyspark.resultiterable.Res