In [1]:
import os

# Create data directory if it doesn't exist
if not os.path.exists('data'):
    os.makedirs('data')

# Create sample text file
sample_text = [
    "Apache Spark is fast",
    "Spark provides RDDs",
    "RDDs are fundamental",
    "Processing big data"
]

# Write sample data to file
with open('data/sample.txt', 'w') as f:
    f.write('\n'.join(sample_text))

print("✅ Sample data created in data/sample.txt")

✅ Sample data created in data/sample.txt


In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session with cluster master
spark = SparkSession.builder.appName("SparkExample").master("spark://spark-master:7077").config("spark.driver.memory", "1g").config("spark.executor.memory", "1g").getOrCreate()

# Understanding RDDs

An RDD (Resilient Distributed Dataset) is the foundation of Apache Spark. Think of it as a distributed collection of elements that can be processed in parallel. RDDs are immutable, meaning once created, they cannot be changed - instead, each transformation creates a new RDD.

## Creating RDDs

We can create RDDs in several ways. Let's look at each method with examples:

In [5]:
# Create our first RDD from a list
data = [1, 2, 3, 4, 5]
rdd1 = spark.sparkContext.parallelize(data)

print("Original data:", data)
print("RDD content:", rdd1.collect())

# Try some transformations
squared = rdd1.map(lambda x: x * x)
print("\nSquared numbers:", squared.collect())

# Filter for even numbers
evens = squared.filter(lambda x: x % 2 == 0)
print("Even squares:", evens.collect())

Original data: [1, 2, 3, 4, 5]
RDD content: [1, 2, 3, 4, 5]

Squared numbers: [1, 4, 9, 16, 25]
Even squares: [4, 16]


## Working with Text Data
Let's create and process some text data:

In [6]:
# Create an RDD directly from a list of strings
texts = [
    "Apache Spark is fast",
    "Spark has RDDs",
    "RDDs are fundamental"
]
text_rdd = spark.sparkContext.parallelize(texts)

print("Our text data:")
for line in text_rdd.collect():
    print(f"  {line}")

Our text data:
  Apache Spark is fast
  Spark has RDDs
  RDDs are fundamental


In [8]:
# Let's count words
words = text_rdd.flatMap(lambda line: line.split())
print("\nIndividual words:", words.collect())

# Create word-count pairs
word_pairs = words.map(lambda word: (word, 1))
print("\nWord-count pairs:", word_pairs.collect())

# Count word occurrences
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
print("\nWord frequencies:")
for word, count in word_counts.collect():
    print(f"  {word}: {count}")


Individual words: ['Apache', 'Spark', 'is', 'fast', 'Spark', 'has', 'RDDs', 'RDDs', 'are', 'fundamental']

Word-count pairs: [('Apache', 1), ('Spark', 1), ('is', 1), ('fast', 1), ('Spark', 1), ('has', 1), ('RDDs', 1), ('RDDs', 1), ('are', 1), ('fundamental', 1)]

Word frequencies:
  Apache: 1
  Spark: 2
  is: 1
  are: 1
  fundamental: 1
  fast: 1
  has: 1
  RDDs: 2


## Understanding Transformations vs Actions


In [9]:
# Create a base RDD
numbers = range(1, 11)  # 1 to 10
base_rdd = spark.sparkContext.parallelize(numbers)

print("Starting with numbers:", list(numbers))

# TRANSFORMATIONS (lazy operations)
# These won't execute until an action is called
doubled = base_rdd.map(lambda x: x * 2)
filtered = doubled.filter(lambda x: x > 10)

print("\nTransformations defined but not yet executed...")

# ACTIONS (trigger execution)
print("\nNow executing transformations through actions:")
print("Doubled numbers:", doubled.collect())
print("Numbers > 10:", filtered.collect())
print("Count of numbers > 10:", filtered.count())

Starting with numbers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Transformations defined but not yet executed...

Now executing transformations through actions:
Doubled numbers: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
Numbers > 10: [12, 14, 16, 18, 20]
Count of numbers > 10: 5


# Advanced operations

In [None]:
# Let's try some more complex operations
complex_data = [
    ("Spark", 100),
    ("Hadoop", 80),
    ("Spark", 90),
    ("Flink", 95)
]
complex_rdd = spark.sparkContext.parallelize(complex_data)

# GroupByKey operation
grouped = complex_rdd.groupByKey().mapValues(list)
print("Grouped by key:")
for k, v in grouped.collect():
    print(f"  {k}: {v}")

# ReduceByKey operation
summed = complex_rdd.reduceByKey(lambda a, b: a + b)
print("\nSummed by key:")
for k, v in summed.collect():
    print(f"  {k}: {v}")

In [None]:
import time

# Create a larger dataset for testing
big_data = list(range(1, 10000001))  # 1 to 10 million
big_rdd = spark.sparkContext.parallelize(big_data)

def process_data(rdd, use_cache=False):
    # Complex transformation chain
    result = rdd.map(lambda x: x * x) \
                .filter(lambda x: x % 2 == 0) \
                .map(lambda x: x / 2)
    
    if use_cache:
        result.cache()
    
    # Force execution and measure time
    start = time.time()
    count = result.count()
    duration = time.time() - start
    
    print(f"Count: {count:,}")
    print(f"Time taken: {duration:.2f} seconds")
    
    return result

print("Without caching:")
result1 = process_data(big_rdd)

print("\nWith caching:")
result2 = process_data(big_rdd, use_cache=True)

In [None]:
# Create sample sentences
sentences = [
    "Spark is a fast and general engine for large-scale data processing",
    "Spark provides an interface for programming entire clusters",
    "RDDs are the fundamental data structure of Spark",
    "Spark supports multiple programming languages"
]

# Create RDD
lines_rdd = spark.sparkContext.parallelize(sentences)

print("Original sentences:")
for line in lines_rdd.collect():
    print(f"  {line}")

# Step 1: Split into words
words = lines_rdd.flatMap(lambda line: line.lower().split())
print("\nWords (first 10):", words.take(10))

# Step 2: Create word-count pairs
pairs = words.map(lambda word: (word, 1))
print("\nWord-count pairs (sample):", pairs.take(5))

# Step 3: Count occurrences
counts = pairs.reduceByKey(lambda a, b: a + b)

# Sort by frequency
sorted_counts = counts.sortBy(lambda x: x[1], ascending=False)

print("\nWord frequencies (top 10):")
for word, count in sorted_counts.take(10):
    print(f"  {word}: {count}")

In [None]:
# Create an RDD with specific number of partitions
rdd = spark.sparkContext.parallelize(range(100), 4)

print(f"Number of partitions: {rdd.getNumPartitions()}")

# See data in each partition
def print_partition(iterator):
    return [f"Partition data: {list(iterator)}"]

partitioned_data = rdd.mapPartitions(print_partition)
print("\nData by partition:")
for partition in partitioned_data.collect():
    print(partition)

In [None]:
spark.stop()