Sure, here are 25 beginner to intermediate level assignment questions on PySpark RDDs:
- Create an RDD from a list of integers and print the RDD.
- Create an RDD from a text file and print the first 10 lines.
- Count the number of elements in an RDD.
- Find the sum of all elements in an RDD of integers.
- Filter out the even numbers from an RDD of integers.
- Map the values of an RDD of integers to their squares.
- Reduce an RDD of integers to their product.
- Calculate the average of elements in an RDD of integers.
- Find the maximum value in an RDD of integers.
- Find the minimum value in an RDD of integers.
- Calculate the standard deviation of elements in an RDD of integers.
- Create a new RDD by joining two existing RDDs.
- Find the common elements between two RDDs.
- Subtract the elements of one RDD from another RDD.
- Compute the Cartesian product of two RDDs.
- Group the elements of an RDD by key.
- Sort an RDD of tuples by key.
- Create an RDD of key-value pairs from a text file and count the number of occurrences of each key.
- Find the most frequently occurring value in an RDD.
- Apply a function to each element of an RDD using mapPartitions.
- Combine the values of an RDD using reduceByKey.
- Compute the average of values for each key in an RDD of key-value pairs using combineByKey.
- Find the top 5 values for each key in an RDD of key-value pairs.
- Compute the frequency distribution of elements in an RDD.
- Find the unique elements in an RDD.

These questions should give you a good foundation in using PySpark RDDs. As you progress, you can try more advanced tasks such as graph processing, machine learning, and streaming data analysis.

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession\
.builder\
.appName("practice_rdd")\
.getOrCreate()

23/05/04 15:35:32 WARN Utils: Your hostname, joker021-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/05/04 15:35:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/04 15:35:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
# Create an RDD from a list of integers and print the RDD.
l = list(range(10))
rdd = spark.sparkContext.parallelize(l)
rdd.glom().collect()

[[0, 1], [2, 3], [4, 5], [6, 7, 8, 9]]

In [16]:
# Create an RDD from a text file and print the first 10 lines.
no_of_lines = 10
path = "spark_text_file.txt"
rdd = spark.sparkContext.textFile(path)
rdd.take(no_of_lines)

['this is ',
 'random text file',
 'where I use to ',
 'parse the',
 'data',
 'in ',
 'PySprak and ',
 'I forgot what is was thinking',
 'Ahh yesss',
 'I need to show first 10 lines in ']

In [17]:
# Create an RDD from a text file and print the first 10 lines.
no_of_lines = 10
path = "*.txt"
rdd = spark.sparkContext.wholeTextFiles(path)
rdd.take(no_of_lines)

[('file:/home/joker021/Documents/Practice_Spark/Spark_ChatGPT/spark_text_file (another copy).txt',
  'this is \nrandom text file\nwhere I use to \nparse the\ndata\nin \nPySprak and \nI forgot what is was thinking\nAhh yesss\nI need to show first 10 lines in \nRdd as outpur\nDid we wtote 10 lines?\ni dont know lets see\n'),
 ('file:/home/joker021/Documents/Practice_Spark/Spark_ChatGPT/spark_text_file (copy).txt',
  'this is \nrandom text file\nwhere I use to \nparse the\ndata\nin \nPySprak and \nI forgot what is was thinking\nAhh yesss\nI need to show first 10 lines in \nRdd as outpur\nDid we wtote 10 lines?\ni dont know lets see\n'),
 ('file:/home/joker021/Documents/Practice_Spark/Spark_ChatGPT/spark_text_file.txt',
  'this is \nrandom text file\nwhere I use to \nparse the\ndata\nin \nPySprak and \nI forgot what is was thinking\nAhh yesss\nI need to show first 10 lines in \nRdd as outpur\nDid we wtote 10 lines?\ni dont know lets see\n')]

In [20]:
# Count the number of elements in an RDD.
l = list(range(10))
rdd = spark.sparkContext.parallelize(l)
rdd.count()

10

In [21]:
# Find the sum of all elements in an RDD of integers.
l = list(range(10))
rdd = spark.sparkContext.parallelize(l)
rdd.sum()

45

In [23]:
# Filter out the even numbers from an RDD of integers.
l = list(range(10))
rdd = spark.sparkContext.parallelize(l)
rdd_even_num = rdd.filter(lambda x: x%2==0)
rdd_even_num.glom().collect()

[[0], [2], [4], [6, 8]]

In [28]:
# Map the values of an RDD of integers to their squares.
l = list(range(10))
rdd = spark.sparkContext.parallelize(l)
rdd_sqr_map = rdd.map(lambda x: x**2)
rdd_sqr_map.glom().collect()

[[0, 1], [4, 9], [16, 25], [36, 49, 64, 81]]

In [30]:
# Reduce an RDD of integers to their product.
l = list(range(1, 10+1))
rdd = spark.sparkContext.parallelize(l)
rdd_prod = rdd.reduce(lambda a, b: a*b)
rdd_prod

3628800

In [31]:
# Calculate the average of elements in an RDD of integers.
l = list(range(10))
rdd = spark.sparkContext.parallelize(l)
rdd.mean()

4.5

In [33]:
# Find the maximum value in an RDD of integers.
l = list(range(10))
rdd = spark.sparkContext.parallelize(l)
rdd.max()

# Using Reduce
# rdd.reduce(lambda a,b: a if a>b else b)

9

In [36]:
# Find the minimum value in an RDD of integers.
l = list(range(10))
rdd = spark.sparkContext.parallelize(l)
rdd.min()

0

In [37]:
# Calculate the standard deviation of elements in an RDD of integers.
l = list(range(10))
rdd = spark.sparkContext.parallelize(l)
rdd.stdev()

2.8722813232690143

In [12]:
# Create a new RDD by joining two existing RDDs.
l1 = list(range(10))
rdd1 = spark.sparkContext.parallelize(l1)
# rdd1_key = rdd1.map(lambda x: (1, x))

l2 = list(range(100, 90, -1))
rdd2 = spark.sparkContext.parallelize(l2)
# rdd2_key = rdd2.map(lambda x: (1, x))

rdd_join = rdd1.union(rdd2)

rdd_join.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91]

In [21]:
# Create a new RDD by joining two existing RDDs.
l1 = list(range(10))
rdd1 = spark.sparkContext.parallelize(l1)
# rdd1_key = rdd1.map(lambda x: (1, x))
rdd1_key = rdd1.zipWithIndex().map(lambda x: (x[1], x[0]))

l2 = list(range(100, 90, -1))
rdd2 = spark.sparkContext.parallelize(l2)
# rdd2_key = rdd2.map(lambda x: (1, x))
rdd2_key = rdd2.zipWithIndex().map(lambda x: (x[1], x[0]))

rdd_join = rdd1_key.join(rdd2_key)

rdd_join.collect()

[(0, (0, 100)),
 (8, (8, 92)),
 (1, (1, 99)),
 (9, (9, 91)),
 (2, (2, 98)),
 (3, (3, 97)),
 (4, (4, 96)),
 (5, (5, 95)),
 (6, (6, 94)),
 (7, (7, 93))]

In [15]:
rdd1.zipWithIndex().collect()

[(0, 0),
 (1, 1),
 (2, 2),
 (3, 3),
 (4, 4),
 (5, 5),
 (6, 6),
 (7, 7),
 (8, 8),
 (9, 9)]

In [19]:
# Find the common elements between two RDDs.
l1 = list(range(10))
rdd1 = spark.sparkContext.parallelize(l1)

l2 = list(range(10, 3, -1))
rdd2 = spark.sparkContext.parallelize(l2)

rdd_common = rdd1.intersection(rdd2)
rdd_common.glom().collect()

[[8], [9], [], [], [4], [5], [6], [7]]

In [35]:
# Subtract the elements of one RDD from another RDD.
# Find the common elements between two RDDs.
l1 = list(range(10))
rdd1 = spark.sparkContext.parallelize(l1)

l2 = list(range(10, 3, -1))
rdd2 = spark.sparkContext.parallelize(l2)

rdd1_only = rdd1.subtract(rdd2)
rdd1_only.glom().collect()

[[0], [1], [2], [3], [], [], [], []]

In [38]:
# Compute the Cartesian product of two RDDs.
l1 = list(range(10))
rdd1 = spark.sparkContext.parallelize(l1)

l2 = list(range(10, 3, -1))
rdd2 = spark.sparkContext.parallelize(l2)

rdd_cartesian = rdd1.cartesian(rdd2)

rdd_cartesian.glom().collect()

[[(0, 10), (1, 10)],
 [(0, 9), (1, 9), (0, 8), (1, 8)],
 [(0, 7), (1, 7), (0, 6), (1, 6)],
 [(0, 5), (1, 5), (0, 4), (1, 4)],
 [(2, 10), (3, 10)],
 [(2, 9), (3, 9), (2, 8), (3, 8)],
 [(2, 7), (3, 7), (2, 6), (3, 6)],
 [(2, 5), (3, 5), (2, 4), (3, 4)],
 [(4, 10), (5, 10)],
 [(4, 9), (5, 9), (4, 8), (5, 8)],
 [(4, 7), (5, 7), (4, 6), (5, 6)],
 [(4, 5), (5, 5), (4, 4), (5, 4)],
 [(6, 10), (7, 10), (8, 10), (9, 10)],
 [(6, 9), (7, 9), (6, 8), (7, 8), (8, 9), (9, 9), (8, 8), (9, 8)],
 [(6, 7), (7, 7), (6, 6), (7, 6), (8, 7), (9, 7), (8, 6), (9, 6)],
 [(6, 5), (7, 5), (6, 4), (7, 4), (8, 5), (9, 5), (8, 4), (9, 4)]]

In [53]:
# Group the elements of an RDD by key.
l1 = list(range(10))
rdd1 = spark.sparkContext.parallelize(l1)
# rdd1_key = rdd1.map(lambda x: (1, x))
rdd1_key = rdd1.zipWithIndex().map(lambda x: (x[1], x[0]))

l2 = list(range(10))
rdd2 = spark.sparkContext.parallelize(l2)
# rdd2_key = rdd2.map(lambda x: (1, x))
rdd2_key = rdd2.zipWithIndex().map(lambda x: (x[1], x[0]))

rdd_all = rdd1_key.union(rdd2_key)

rdd_all.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

                                                                                

[(0, [0, 0]),
 (8, [8, 8]),
 (1, [1, 1]),
 (9, [9, 9]),
 (2, [2, 2]),
 (3, [3, 3]),
 (4, [4, 4]),
 (5, [5, 5]),
 (6, [6, 6]),
 (7, [7, 7])]

In [56]:
# Sort an RDD of tuples by key
l = list(range(10, 0, -1))
rdd = spark.sparkContext.parallelize(l)
rdd_k = rdd.zipWithIndex().map(lambda x: (x[1], x[0]))
rdd_k.sortByKey().glom().collect()

[[(0, 10), (1, 9), (2, 8)],
 [(3, 7), (4, 6), (5, 5)],
 [(6, 4), (7, 3)],
 [(8, 2), (9, 1)]]

In [61]:
# Create an RDD of key-value pairs from a text file and count the number of occurrences of each key.
path = "spark_text_file.txt"
rdd = spark.sparkContext.textFile(path)
rdd_word = rdd.flatMap(lambda x: x.split())
# rdd_word.countByValue()
rdd_word.map(lambda x: (x, 1)).countByKey()

defaultdict(int,
            {'this': 1,
             'is': 2,
             'random': 1,
             'text': 1,
             'file': 1,
             'where': 1,
             'I': 3,
             'use': 1,
             'to': 2,
             'parse': 1,
             'the': 1,
             'data': 1,
             'in': 2,
             'PySprak': 1,
             'and': 1,
             'forgot': 1,
             'what': 1,
             'was': 1,
             'thinking': 1,
             'Ahh': 1,
             'yesss': 1,
             'need': 1,
             'show': 1,
             'first': 1,
             '10': 2,
             'lines': 1,
             'Rdd': 1,
             'as': 1,
             'outpur': 1,
             'Did': 1,
             'we': 1,
             'wtote': 1,
             'lines?': 1,
             'i': 1,
             'dont': 1,
             'know': 1,
             'lets': 1,
             'see': 1})

In [74]:
# Find the most frequently occurring value in an RDD.
# Create an RDD of key-value pairs from a text file and count the number of occurrences of each key.
path = "spark_text_file.txt"
rdd = spark.sparkContext.textFile(path)
rdd_word = rdd.flatMap(lambda x: x.split())
# rdd_word.countByValue()
rdd_word_map = rdd_word.map(lambda x: (x, 1))
# rdd_word.map(lambda x: (x, 1)).countByKey()
rdd_word_count = rdd_word_map.reduceByKey(lambda a,b:a+b)

rdd_word_count.max(lambda x: x[1])
# rdd_word_count.collect()

('Iam', 6)

In [106]:
# Apply a function to each element of an RDD using mapPartitions.
l = list(range(10))
rdd = spark.sparkContext.parallelize(l)
# rdd.mapPartitions(lambda x: sum(x)).glom().collect()
# rdd.mapPartitions(lambda it: (x*x for x in it)).glom().collect()
rdd.mapPartitions(lambda it: [sum(it)]).glom().collect()

[[1], [5], [9], [30]]

In [108]:
# Combine the values of an RDD using reduceByKey.
l = list(range(10))
rdd = spark.sparkContext.parallelize(l)
rdd_odd_even = rdd.map(lambda x: (x%2, x))
rdd_odd_even.reduceByKey(lambda a,b:a+b).glom().collect()

[[(0, 20)], [(1, 25)], [], []]

In [121]:
# Compute the average of values for each key in an RDD of key-value pairs using combineByKey.
l = list(range(10))
rdd = spark.sparkContext.parallelize(l)

# rdd_odd_even.combineByKey(createCombiner=lambda x: x, mergeValue=lambda a,b:a+b, mergeCombiners=lambda a,b:a+b).collect()
rdd_odd_even.combineByKey(
    createCombiner=lambda x: (x, 1), 
    mergeValue=lambda a, b: (a[0]+b, a[1]+1), 
    mergeCombiners=lambda a, b: (a[0]+b[0], a[1]+b[1])
).map(lambda x: (x[0], x[1][0]/x[1][1])).collect()


[(0, 4.0), (1, 5.0)]

In [128]:
# Find the top 5 values for each key in an RDD of key-value pairs.
l = list(range(100))
rdd = spark.sparkContext.parallelize(l)
rdd_k = rdd.map(lambda x: (x%10, x))
rdd_k.groupByKey().map(lambda x: (x[0], sorted(list(x[1]), reverse=True)[:5])).collect()

[(0, [90, 80, 70, 60, 50]),
 (4, [94, 84, 74, 64, 54]),
 (8, [98, 88, 78, 68, 58]),
 (1, [91, 81, 71, 61, 51]),
 (5, [95, 85, 75, 65, 55]),
 (9, [99, 89, 79, 69, 59]),
 (2, [92, 82, 72, 62, 52]),
 (6, [96, 86, 76, 66, 56]),
 (3, [93, 83, 73, 63, 53]),
 (7, [97, 87, 77, 67, 57])]

In [144]:
def append(a, b):
    if isinstance(a, list):
        if isinstance(b, list):
            a.extend(b)
        else:
            a.append(b)
    else:
        a = [b]
    return a
rdd_k.reduceByKey(append).map(lambda x: (x[0], sorted(list(x[1]), reverse=True)[:5])).collect()

                                                                                

[(0, [90, 70, 60, 40, 20]),
 (4, [94, 74, 64, 44, 24]),
 (8, [98, 88, 68, 48, 38]),
 (1, [91, 71, 61, 41, 21]),
 (5, [95, 85, 65, 45, 35]),
 (9, [99, 89, 69, 49, 39]),
 (2, [92, 72, 62, 42, 22]),
 (6, [96, 86, 66, 46, 36]),
 (3, [93, 73, 63, 43, 23]),
 (7, [97, 87, 67, 47, 37])]

In [148]:
def append(a, b):
    if isinstance(a, list):
        if isinstance(b, list):
            a.extend(b)
        else:
            a.append(b)
    else:
        a = [b]
    return a if len(a)<5 else sorted(a, reverse=True)[:5]
rdd_k.combineByKey(
    createCombiner=lambda x: [x],
    mergeValue=append,
    mergeCombiners=append
).collect()



[(0, [90, 80, 70, 60, 50]),
 (4, [94, 84, 74, 64, 54]),
 (8, [98, 88, 78, 68, 58]),
 (1, [91, 81, 71, 61, 51]),
 (5, [95, 85, 75, 65, 55]),
 (9, [99, 89, 79, 69, 59]),
 (2, [92, 82, 72, 62, 52]),
 (6, [96, 86, 76, 66, 56]),
 (3, [93, 83, 73, 63, 53]),
 (7, [97, 87, 77, 67, 57])]

In [160]:
# Compute the frequency distribution of elements in an RDD.
l = list(map(lambda x: x%10, range(100)))
l.extend([101, 102, 103, 104])
rdd = spark.sparkContext.parallelize(l)
rdd.countByValue()

defaultdict(int,
            {0: 10,
             1: 10,
             2: 10,
             3: 10,
             4: 10,
             5: 10,
             6: 10,
             7: 10,
             8: 10,
             9: 10,
             101: 1,
             102: 1,
             103: 1,
             104: 1})

In [162]:
# Find the unique elements in an RDD.
l = list(map(lambda x: x%10, range(100)))
l.extend([101, 102, 103, 104])
rdd = spark.sparkContext.parallelize(l)
rdd.map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b).filter(lambda x: (x[1]==1)).collect()
rdd.map(lambda x: (x, 1)).combineByKey(
    createCombiner=lambda x: 1,
    mergeValue=lambda acc,v: acc+1,
    mergeCombiners=lambda acc1, acc2: acc1+acc2
).filter(lambda x: (x[1]==1)).collect()

[(104, 1), (101, 1), (102, 1), (103, 1)]