In [1]:

# Import necessary modules
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rank, udf
from pyspark.sql.window import Window
from pyspark.sql.types import StringType


In [2]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("RDDsExample") \
    .getOrCreate()


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 57127)
Traceback (most recent call last):
  File "c:\PySparkTools\WinPython311\WPy64-31140\python-3.11.4.amd64\Lib\socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "c:\PySparkTools\WinPython311\WPy64-31140\python-3.11.4.amd64\Lib\socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "c:\PySparkTools\WinPython311\WPy64-31140\python-3.11.4.amd64\Lib\socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "c:\PySparkTools\WinPython311\WPy64-31140\python-3.11.4.amd64\Lib\socketserver.py", line 755, in __init__
    self.handle()
  File "C:\Program Files\Spark-3.5.5\python\pyspark\accumulators.py", line 295, in handle
    poll(accum_updates)
  File "C:\Program Files\Spark-3.5.5\python\pyspark\accumulators.py", line 267,

In [None]:
# -------------------------------
# 🟢 BASIC RDD EXAMPLES
# -------------------------------


In [3]:
# 1. Create an RDD from a Python list
# Parallelize converts a local list into a distributed RDD
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)
print("RDD Contents:", rdd.collect())


RDD Contents: [1, 2, 3, 4, 5]


In [4]:
# 2. Map transformation
# Multiply each element by 2
rdd_map = rdd.map(lambda x: x * 2)
print("Mapped RDD:", rdd_map.collect())


Mapped RDD: [2, 4, 6, 8, 10]


In [5]:
# 3. Filter transformation
# Keep only even numbers
rdd_filter = rdd.filter(lambda x: x % 2 == 0)
print("Filtered RDD:", rdd_filter.collect())


Filtered RDD: [2, 4]


In [6]:
# 4. Reduce action
# Sum all elements
rdd_sum = rdd.reduce(lambda x, y: x + y)
print("Sum of RDD:", rdd_sum)


Sum of RDD: 15


In [None]:
# -------------------------------
# 🟡 INTERMEDIATE RDD EXAMPLES
# -------------------------------


In [7]:
# 5. FlatMap transformation
# Split sentences into words
sentences = ["hello world", "how are you"]
rdd_text = spark.sparkContext.parallelize(sentences)
rdd_words = rdd_text.flatMap(lambda line: line.split(" "))
print("Words:", rdd_words.collect())


Words: ['hello', 'world', 'how', 'are', 'you']


In [8]:
# 6. Key-Value Pair RDD and reduceByKey
# Aggregate values by key
pairs = [("a", 1), ("b", 2), ("a", 3)]
rdd_kv = spark.sparkContext.parallelize(pairs)
rdd_reduced = rdd_kv.reduceByKey(lambda x, y: x + y)
print("Reduced by Key:", rdd_reduced.collect())


Reduced by Key: [('a', 4), ('b', 2)]


In [9]:
# 7. GroupByKey
# Groups values under each key
rdd_grouped = rdd_kv.groupByKey().mapValues(list)
print("Grouped by Key:", rdd_grouped.collect())


Grouped by Key: [('a', [1, 3]), ('b', [2])]


In [None]:
# -------------------------------
# 🔵 MODERATE RDD EXAMPLES
# -------------------------------

In [10]:
# 8. Join two Pair RDDs
# Joins based on keys
rdd1 = spark.sparkContext.parallelize([("a", 1), ("b", 2)])
rdd2 = spark.sparkContext.parallelize([("a", "apple"), ("b", "banana")])
rdd_joined = rdd1.join(rdd2)
print("Joined RDD:", rdd_joined.collect())

Joined RDD: [('a', (1, 'apple')), ('b', (2, 'banana'))]


In [11]:
# 9. SortBy transformation
# Sort elements by a custom key
rdd_unsorted = spark.sparkContext.parallelize([("b", 2), ("a", 3), ("c", 1)])
rdd_sorted = rdd_unsorted.sortBy(lambda x: x[1])
print("Sorted RDD:", rdd_sorted.collect())

Sorted RDD: [('c', 1), ('b', 2), ('a', 3)]


In [12]:
# 10. Save RDD to text file (optional)
# Uncomment to save output to disk
rdd_sorted.saveAsTextFile("output_rdd")