In [None]:
#1] Write example for following Spark RDD Actions:
#a. count   b. countApproxDistinct
#c. first   d. top   e. Min

#Solution:

# Spark RDD Actions Demo (single script)
# Actions covered:
# a) count
# b) countApproxDistinct
# c) first
# d) top
# e) min

from pyspark.sql import SparkSession

def main():
    spark = SparkSession.builder.appName("RDD Actions Demo #1").master("local[*]").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")

    # Example RDD with duplicates: 1..100 plus 51..150
    nums = list(range(1, 101)) + list(range(51, 151))
    rdd = sc.parallelize(nums, numSlices=4)

    print(f"Partitions: {rdd.getNumPartitions()}")

    # a) count
    total_count = rdd.count()
    print(f"[a] count -> {total_count}")  # expected 200

    # b) countApproxDistinct (approximate distinct count)
    approx_distinct = rdd.countApproxDistinct()  # default relativeSD=0.05
    print(f"[b] countApproxDistinct -> {approx_distinct}  (true distinct is 150)")

    # c) first
    first_elem = rdd.first()
    print(f"[c] first -> {first_elem}")

    # d) top (descending)
    top5 = rdd.top(5)
    print(f"[d] top(5) -> {top5}")

    # e) min
    min_val = rdd.min()
    print(f"[e] min -> {min_val}")

    spark.stop()

if __name__ == "__main__":
    main()

Partitions: 4
[a] count -> 200
[b] countApproxDistinct -> 145  (true distinct is 150)
[c] first -> 1
[d] top(5) -> [150, 149, 148, 147, 146]
[e] min -> 1


In [None]:
#2 Write Spark Pair RDD Functions.

#Solution:


In [None]:
# Spark Pair RDD Functions Demo (single script)
# Covered:
# - Transformations: keys, values, mapValues, flatMapValues, reduceByKey, foldByKey,
#   aggregateByKey, combineByKey, groupByKey, sortByKey, subtractByKey, join,
#   leftOuterJoin, rightOuterJoin, fullOuterJoin, cogroup, partitionBy,
#   repartitionAndSortWithinPartitions
# - Actions: countByKey, lookup

from pyspark.sql import SparkSession
from operator import add

def main():
    spark = SparkSession.builder.appName("Pair RDD Functions Demo").master("local[*]").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")

    # Sample Pair RDDs
    data = [
        ("a", 1), ("b", 2), ("a", 3), ("c", 5),
        ("b", 1), ("a", 2), ("d", 4)
    ]
    info = [
        ("a", "alpha"), ("b", "beta"), ("d", "delta"), ("e", "epsilon")
    ]

    kv = sc.parallelize(data, numSlices=3)
    kv2 = sc.parallelize(info, numSlices=2)

    print(f"kv partitions: {kv.getNumPartitions()}, kv2 partitions: {kv2.getNumPartitions()}")
    print("kv ->", kv.collect())
    print("kv2 ->", kv2.collect())

    # keys, values
    print("\n-- keys(), values() --")
    print("keys():", kv.keys().collect())
    print("values():", kv.values().collect())

    # mapValues, flatMapValues
    print("\n-- mapValues, flatMapValues --")
    print("mapValues(*10):", kv.mapValues(lambda v: v * 10).collect())
    print("flatMapValues(range(1, v+1)) sample:", kv.flatMapValues(lambda v: range(1, v + 1)).take(12))

    # reduceByKey, foldByKey
    print("\n-- reduceByKey, foldByKey --")
    bykey_sum = kv.reduceByKey(add)
    print("reduceByKey(sum):", sorted(bykey_sum.collect()))
    fold_sum = kv.foldByKey(0, add)
    print("foldByKey(sum, zero=0):", sorted(fold_sum.collect()))

    # aggregateByKey: (sum, count) -> avg
    print("\n-- aggregateByKey (avg per key) --")
    agg = kv.aggregateByKey((0, 0),
                            lambda acc, v: (acc[0] + v, acc[1] + 1),
                            lambda a, b: (a[0] + b[0], a[1] + b[1]))
    avg_by_key = agg.mapValues(lambda sc: sc[0] / sc[1])
    print("aggregateByKey -> (sum,count):", sorted(agg.collect()))
    print("avg_by_key:", sorted(avg_by_key.collect()))

    # combineByKey: another way to compute avg
    print("\n-- combineByKey (avg per key) --")
    comb = kv.combineByKey(lambda v: (v, 1),
                           lambda acc, v: (acc[0] + v, acc[1] + 1),
                           lambda a, b: (a[0] + b[0], a[1] + b[1]))
    avg_by_key2 = comb.mapValues(lambda sc: sc[0] / sc[1])
    print("combineByKey -> (sum,count):", sorted(comb.collect()))
    print("avg_by_key2:", sorted(avg_by_key2.collect()))

    # groupByKey (use with care; can be heavy)
    print("\n-- groupByKey --")
    grouped = kv.groupByKey().mapValues(lambda it: sorted(list(it)))
    print("groupByKey ->", sorted(grouped.collect()))

    # sortByKey
    print("\n-- sortByKey --")
    print("sortByKey(asc):", kv.sortByKey(ascending=True).collect())
    print("sortByKey(desc):", kv.sortByKey(ascending=False).collect())

    # subtractByKey
    print("\n-- subtractByKey --")
    print("kv.subtractByKey(kv2):", kv.subtractByKey(kv2).collect())

    # Joins
    print("\n-- joins --")
    print("join:", sorted(kv.join(kv2).collect(), key=lambda x: (x[0], x[1])))
    print("leftOuterJoin:", sorted(kv.leftOuterJoin(kv2).collect(), key=lambda x: x[0]))
    print("rightOuterJoin:", sorted(kv.rightOuterJoin(kv2).collect(), key=lambda x: x[0]))
    print("fullOuterJoin:", sorted(kv.fullOuterJoin(kv2).collect(), key=lambda x: x[0]))

    # cogroup (values from multiple RDDs grouped per key)
    print("\n-- cogroup --")
    co = kv.cogroup(kv2).mapValues(lambda t: (sorted(list(t[0])), sorted(list(t[1]))))
    print("cogroup:", sorted(co.collect()))

    # Actions: countByKey, lookup
    print("\n-- actions: countByKey, lookup --")
    print("countByKey:", dict(kv.countByKey()))
    print("lookup('a'):", kv.lookup("a"))

    # sampleByKey (probabilistic)
    print("\n-- sampleByKey --")
    fractions = {"a": 1.0, "b": 0.5, "c": 1.0, "d": 0.0}
    sampled = kv.sampleByKey(withReplacement=False, fractions=fractions, seed=42)
    print("sampleByKey:", sorted(sampled.collect()))

    # partitionBy and repartitionAndSortWithinPartitions
    print("\n-- partitionBy, repartitionAndSortWithinPartitions --")
    kv_part = kv.partitionBy(2)
    print("partitionBy(2) -> partitions:", kv_part.getNumPartitions())
    rs = kv.repartitionAndSortWithinPartitions(2)
    parts = rs.mapPartitionsWithIndex(lambda idx, it: [(idx, list(it))]).collect()
    print("repartitionAndSortWithinPartitions(2):", sorted(parts, key=lambda x: x[0]))

    spark.stop()

if __name__ == "__main__":
    main()

kv partitions: 3, kv2 partitions: 2
kv -> [('a', 1), ('b', 2), ('a', 3), ('c', 5), ('b', 1), ('a', 2), ('d', 4)]
kv2 -> [('a', 'alpha'), ('b', 'beta'), ('d', 'delta'), ('e', 'epsilon')]

-- keys(), values() --
keys(): ['a', 'b', 'a', 'c', 'b', 'a', 'd']
values(): [1, 2, 3, 5, 1, 2, 4]

-- mapValues, flatMapValues --
mapValues(*10): [('a', 10), ('b', 20), ('a', 30), ('c', 50), ('b', 10), ('a', 20), ('d', 40)]
flatMapValues(range(1, v+1)) sample: [('a', 1), ('b', 1), ('b', 2), ('a', 1), ('a', 2), ('a', 3), ('c', 1), ('c', 2), ('c', 3), ('c', 4), ('c', 5), ('b', 1)]

-- reduceByKey, foldByKey --
reduceByKey(sum): [('a', 6), ('b', 3), ('c', 5), ('d', 4)]
foldByKey(sum, zero=0): [('a', 6), ('b', 3), ('c', 5), ('d', 4)]

-- aggregateByKey (avg per key) --
aggregateByKey -> (sum,count): [('a', (6, 3)), ('b', (3, 2)), ('c', (5, 1)), ('d', (4, 1))]
avg_by_key: [('a', 2.0), ('b', 1.5), ('c', 5.0), ('d', 4.0)]

-- combineByKey (avg per key) --
combineByKey -> (sum,count): [('a', (6, 3)), ('b', (3