
# SETUP: Install and Initialize PySpark

In [None]:
# !pip install pyspark
# !pip install py4j


In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os


# Initialize Spark

In [None]:
conf = SparkConf().setAppName("RDD_Lab").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

print(f"Spark Version: {sc.version}")
print(f"Default Parallelism: {sc.defaultParallelism}")

Spark Version: 3.5.1
Default Parallelism: 2


# PART 1: CREATING RDDs - Three Main Ways

# METHOD 1: Parallelized Collections (from Python data structures)

In [None]:
# From list
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
numbers_rdd = sc.parallelize(numbers)
print(f"Numbers RDD: {numbers_rdd.collect()}")

Numbers RDD: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


In [None]:
# From list with specific number of partitions
numbers_rdd_4_partitions = sc.parallelize(numbers, 4)
print(f"Partitions: {numbers_rdd_4_partitions.getNumPartitions()}")

Partitions: 4


In [None]:
# From tuple
data_tuples = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
people_rdd = sc.parallelize(data_tuples)
print(f"People RDD: {people_rdd.collect()}")

People RDD: [('Alice', 25), ('Bob', 30), ('Charlie', 35)]


In [None]:
# From range
range_rdd = sc.parallelize(range(1, 11))
print(f"Range RDD: {range_rdd.collect()}")

Range RDD: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


# METHOD 2: External Datasets (from files)

In [None]:
# Create sample files for demonstration
sample_text = """Apache Spark is a fast engine for large-scale data processing
It provides high-level APIs in Java, Scala, Python and R
Spark runs on Hadoop, Apache Mesos, Kubernetes, standalone
It can access diverse data sources"""

# Write sample file
with open('/tmp/sample.txt', 'w') as f:
    f.write(sample_text)

# Read text file
text_rdd = sc.textFile('/tmp/sample.txt')
print(f"Text file lines: {text_rdd.count()}")
print("First line:", text_rdd.first())

# Read with minimum partitions
text_rdd_min_parts = sc.textFile('/tmp/sample.txt', minPartitions=2)
print(f"Partitions with minPartitions=2: {text_rdd_min_parts.getNumPartitions()}")

Text file lines: 4
First line: Apache Spark is a fast engine for large-scale data processing
Partitions with minPartitions=2: 2


In [None]:
# Create sample data file (CSV-like)
sample_data = """name,age,city
John,28,NYC
Jane,32,LA
Mike,25,Chicago
Sarah,29,Boston"""

with open('/tmp/sample_data.txt', 'w') as f:
    f.write(sample_data)

data_rdd = sc.textFile('/tmp/sample_data.txt')
print(f"Data file content: {data_rdd.collect()}")

Data file content: ['name,age,city', 'John,28,NYC', 'Jane,32,LA', 'Mike,25,Chicago', 'Sarah,29,Boston']


# METHOD 3: Transformations from Existing RDDs

In [None]:
# Map transformation
squared_rdd = numbers_rdd.map(lambda x: x**2)
print(f"Squared numbers: {squared_rdd.collect()}")

Squared numbers: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]


In [None]:
# Filter transformation
even_rdd = numbers_rdd.filter(lambda x: x % 2 == 0)
print(f"Even numbers: {even_rdd.collect()}")

Even numbers: [2, 4, 6, 8, 10]


In [None]:
# FlatMap transformation
words_rdd = text_rdd.flatMap(lambda line: line.split())
print(f"Total words: {words_rdd.count()}")
print(f"First 10 words: {words_rdd.take(10)}")

Total words: 34
First 10 words: ['Apache', 'Spark', 'is', 'a', 'fast', 'engine', 'for', 'large-scale', 'data', 'processing']


# **Initialize Spark Context**

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd

# Create Spark configuration
conf = SparkConf().setAppName("RDD Operations Tutorial").setMaster("local[*]")

# Initialize Spark Context (if not already initialized)
try:
    sc.stop()  # Stop existing context if any
except:
    pass

sc = SparkContext(conf=conf)

print("✅ Spark Context initialized successfully!")
print(f"Spark Version: {sc.version}")
print(f"Python Version: {sc.pythonVer}")

✅ Spark Context initialized successfully!
Spark Version: 3.5.1
Python Version: 3.12


Create Sample Data

In [None]:
# Sample datasets we'll use throughout this tutorial
numbers = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
words = sc.parallelize(["hello", "world", "spark", "python", "big", "data"])
sentences = sc.parallelize(["hello world", "spark is fast", "big data processing"])
key_value_pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("c", 1), ("b", 4)])

print("Sample datasets created!")
print(f"Numbers: {numbers.collect()}")
print(f"Words: {words.collect()}")

Sample datasets created!
Numbers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Words: ['hello', 'world', 'spark', 'python', 'big', 'data']


# This is a transformation - nothing happens yet!
doubled = numbers.map(lambda x: x * 2)
print("Transformation defined, but not executed yet")

# This is an action - now everything gets executed!
result = doubled.collect()
print(f"Action triggered! Result: {result}")

In [None]:
# This is a transformation - nothing happens yet!
doubled = numbers.map(lambda x: x * 2)
print("Transformation defined, but not executed yet")

# This is an action - now everything gets executed!
result = doubled.collect()
print(f"Action triggered! Result: {result}")

Transformation defined, but not executed yet
Action triggered! Result: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]


# **TRANSFORMATION OPERATIONS**

1.map(func) - One-to-One Mapping

Purpose: Transform each element individually

In [None]:
print("=== MAP TRANSFORMATION ===")

# Example 1: Simple arithmetic
numbers = sc.parallelize([1, 2, 3, 4, 5])
doubled = numbers.map(lambda x: x * 2)
squared = numbers.map(lambda x: x ** 2)

print(f"Original: {numbers.collect()}")
print(f"Doubled:  {doubled.collect()}")
print(f"Squared:  {squared.collect()}")

# Example 2: String operations
words = sc.parallelize(["hello", "world", "spark"])
upper_case = words.map(lambda word: word.upper())
lengths = words.map(lambda word: len(word))

print(f"Original: {words.collect()}")
print(f"Uppercase: {upper_case.collect()}")
print(f"Lengths: {lengths.collect()}")

# Example 3: Working with dictionaries
people = sc.parallelize([
    {"name": "Alice", "age": 25},
    {"name": "Bob", "age": 30},
    {"name": "Charlie", "age": 35}
])

age_groups = people.map(lambda p:
    f"{p['name']}: Young" if p['age'] < 30 else f"{p['name']}: Mature"
)
print(f"Age Groups: {age_groups.collect()}")

=== MAP TRANSFORMATION ===
Original: [1, 2, 3, 4, 5]
Doubled:  [2, 4, 6, 8, 10]
Squared:  [1, 4, 9, 16, 25]
Original: ['hello', 'world', 'spark']
Uppercase: ['HELLO', 'WORLD', 'SPARK']
Lengths: [5, 5, 5]
Age Groups: ['Alice: Young', 'Bob: Mature', 'Charlie: Mature']


2.flatMap(func) - One-to-Many Mapping

Purpose: Transform each element into multiple elements, then flatten

In [None]:
print("\n=== FLATMAP TRANSFORMATION ===")

# Example 1: Split sentences into words
sentences = sc.parallelize(["hello world", "spark is awesome", "big data"])
words = sentences.flatMap(lambda line: line.split(" "))
print(f"Sentences: {sentences.collect()}")
print(f"Words: {words.collect()}")

# Example 2: Generate ranges
nums = sc.parallelize([1, 2, 3])
ranges = nums.flatMap(lambda n: range(1, n + 1))
print(f"Original: {nums.collect()}")
print(f"Ranges: {ranges.collect()}")

# Example 3: Process structured data
data = sc.parallelize(["a:1,2,3", "b:4,5", "c:6"])
key_values = data.flatMap(lambda line: [
    (line.split(":")[0], int(val))
    for val in line.split(":")[1].split(",")
])
print(f"Key-Values: {key_values.collect()}")

# Example 4: Text processing
text_data = sc.parallelize([
    "Apache Spark is fast",
    "Python is great for data science",
    "Machine learning with PySpark"
])
# Extract words longer than 4 characters
long_words = text_data.flatMap(lambda line: [
    word.lower() for word in line.split() if len(word) > 4
])
print(f"Long words: {long_words.collect()}")


=== FLATMAP TRANSFORMATION ===
Sentences: ['hello world', 'spark is awesome', 'big data']
Words: ['hello', 'world', 'spark', 'is', 'awesome', 'big', 'data']
Original: [1, 2, 3]
Ranges: [1, 1, 2, 1, 2, 3]
Key-Values: [('a', 1), ('a', 2), ('a', 3), ('b', 4), ('b', 5), ('c', 6)]
Long words: ['apache', 'spark', 'python', 'great', 'science', 'machine', 'learning', 'pyspark']


3.filter(func) - Conditional Filtering

Purpose: Keep only elements that satisfy a condition

In [None]:
print("\n=== FILTER TRANSFORMATION ===")

# Example 1: Filter numbers
numbers = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
even_numbers = numbers.filter(lambda x: x % 2 == 0)
large_numbers = numbers.filter(lambda x: x > 5)

print(f"Original: {numbers.collect()}")
print(f"Even: {even_numbers.collect()}")
print(f"Greater than 5: {large_numbers.collect()}")

# Example 2: Filter strings
words = sc.parallelize(["spark", "hadoop", "python", "scala", "java"])
long_words = words.filter(lambda word: len(word) > 5)
words_with_a = words.filter(lambda word: "a" in word)

print(f"Original: {words.collect()}")
print(f"Long words: {long_words.collect()}")
print(f"Contains 'a': {words_with_a.collect()}")

# Example 3: Complex filtering with dictionaries
students = sc.parallelize([
    {"name": "Alice", "grade": 85, "subject": "Math"},
    {"name": "Bob", "grade": 92, "subject": "Science"},
    {"name": "Charlie", "grade": 78, "subject": "Math"},
    {"name": "Diana", "grade": 96, "subject": "Science"}
])

high_performers = students.filter(lambda s: s["grade"] >= 90)
math_students = students.filter(lambda s: s["subject"] == "Math")

print(f"High performers: {[s['name'] for s in high_performers.collect()]}")
print(f"Math students: {[s['name'] for s in math_students.collect()]}")

# Example 4: Filter with multiple conditions
sales_data = sc.parallelize([
    {"product": "laptop", "price": 1200, "quantity": 5},
    {"product": "phone", "price": 800, "quantity": 10},
    {"product": "tablet", "price": 600, "quantity": 3},
    {"product": "monitor", "price": 300, "quantity": 8}
])

expensive_popular = sales_data.filter(
    lambda item: item["price"] > 500 and item["quantity"] > 5
)
print(f"Expensive & Popular: {expensive_popular.collect()}")


=== FILTER TRANSFORMATION ===
Original: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Even: [2, 4, 6, 8, 10]
Greater than 5: [6, 7, 8, 9, 10]
Original: ['spark', 'hadoop', 'python', 'scala', 'java']
Long words: ['hadoop', 'python']
Contains 'a': ['spark', 'hadoop', 'scala', 'java']
High performers: ['Bob', 'Diana']
Math students: ['Alice', 'Charlie']
Expensive & Popular: [{'product': 'phone', 'price': 800, 'quantity': 10}]


4.mapPartitions(func) - Partition-Level Processing

Purpose: Process entire partitions at once (efficient for expensive setup)

In [None]:
print("\n=== MAPPARTITIONS TRANSFORMATION ===")

numbers = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8], 3)  # 3 partitions

# Example 1: Expensive setup simulation
def expensive_processing(partition):
    print(f"Processing partition with expensive setup...")
    # Simulate expensive setup (database connection, etc.)
    expensive_resource = "DatabaseConnection"

    # Process all elements in this partition
    return [x * 10 for x in partition]

processed = numbers.mapPartitions(expensive_processing)
print(f"Result: {processed.collect()}")

# Example 2: Batch processing with statistics
def batch_statistics(partition):
    batch = list(partition)
    if not batch:
        return []

    batch_sum = sum(batch)
    batch_count = len(batch)
    batch_avg = batch_sum / batch_count

    print(f"Processing batch of size: {batch_count}, avg: {batch_avg}")
    return [(x, batch_avg) for x in batch]

batch_processed = numbers.mapPartitions(batch_statistics)
print(f"Batch processed: {batch_processed.collect()}")

# Example 3: Using libraries efficiently
def normalize_partition(partition):
    # Convert partition to numpy array (efficient for mathematical operations)
    import numpy as np
    data = list(partition)
    if not data:
        return []

    arr = np.array(data)
    normalized = (arr - arr.mean()) / arr.std() if arr.std() > 0 else arr
    return normalized.tolist()

normalized = numbers.mapPartitions(normalize_partition)
print(f"Normalized: {normalized.collect()}")


=== MAPPARTITIONS TRANSFORMATION ===
Result: [10, 20, 30, 40, 50, 60, 70, 80]
Batch processed: [(1, 1.5), (2, 1.5), (3, 3.5), (4, 3.5), (5, 6.5), (6, 6.5), (7, 6.5), (8, 6.5)]
Normalized: [-1.0, 1.0, -1.0, 1.0, -1.3416407864998738, -0.4472135954999579, 0.4472135954999579, 1.3416407864998738]


5.mapPartitionsWithIndex(func) - Partition Processing with Index

Purpose: Process partitions with access to partition index

In [None]:
print("\n=== MAPPARTITIONSWITHINDEX TRANSFORMATION ===")

numbers = sc.parallelize([1, 2, 3, 4, 5, 6], 3)

def indexed_processing(partition_id, partition):
    print(f"Processing partition {partition_id}")
    return [(partition_id, x * 10) for x in partition]

indexed_result = numbers.mapPartitionsWithIndex(indexed_processing)
print(f"Indexed result: {indexed_result.collect()}")

# Example: Add partition info to data
def add_partition_info(partition_id, partition):
    return [f"P{partition_id}: {value}" for value in partition]

with_partition_info = numbers.mapPartitionsWithIndex(add_partition_info)
print(f"With partition info: {with_partition_info.collect()}")

# Example: Partition-specific processing
def partition_specific_logic(partition_id, partition):
    multiplier = partition_id + 1  # Different multiplier per partition
    return [x * multiplier for x in partition]

partition_specific = numbers.mapPartitionsWithIndex(partition_specific_logic)
print(f"Partition-specific: {partition_specific.collect()}")


=== MAPPARTITIONSWITHINDEX TRANSFORMATION ===
Indexed result: [(0, 10), (0, 20), (1, 30), (1, 40), (2, 50), (2, 60)]
With partition info: ['P0: 1', 'P0: 2', 'P1: 3', 'P1: 4', 'P2: 5', 'P2: 6']
Partition-specific: [1, 2, 6, 8, 15, 18]


6.Set Operations

In [None]:
print("\n=== SET OPERATIONS ===")

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])

print(f"RDD1: {rdd1.collect()}")
print(f"RDD2: {rdd2.collect()}")

# Union: Combine all elements (duplicates included)
union_result = rdd1.union(rdd2)
print(f"Union: {union_result.collect()}")

# Intersection: Common elements only
intersection_result = rdd1.intersection(rdd2)
print(f"Intersection: {intersection_result.collect()}")

# Subtract: Elements in rdd1 but not in rdd2
subtract_result = rdd1.subtract(rdd2)
print(f"Subtract (RDD1 - RDD2): {subtract_result.collect()}")

# Distinct: Remove duplicates
with_duplicates = sc.parallelize([1, 2, 2, 3, 3, 3, 4, 5])
distinct_result = with_duplicates.distinct()
print(f"With duplicates: {with_duplicates.collect()}")
print(f"Distinct: {distinct_result.collect()}")

# Cartesian product
small_rdd1 = sc.parallelize([1, 2])
small_rdd2 = sc.parallelize(["a", "b"])
cartesian_result = small_rdd1.cartesian(small_rdd2)
print(f"Cartesian product: {cartesian_result.collect()}")


=== SET OPERATIONS ===
RDD1: [1, 2, 3, 4, 5]
RDD2: [4, 5, 6, 7, 8]
Union: [1, 2, 3, 4, 5, 4, 5, 6, 7, 8]
Intersection: [4, 5]
Subtract (RDD1 - RDD2): [1, 2, 3]
With duplicates: [1, 2, 2, 3, 3, 3, 4, 5]
Distinct: [2, 4, 1, 3, 5]
Cartesian product: [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]


7.Key-Value Transformations

In [None]:
print("\n=== KEY-VALUE TRANSFORMATIONS ===")

pairs = sc.parallelize([
    ("apple", 5), ("banana", 3), ("apple", 2),
    ("orange", 4), ("banana", 1), ("apple", 3)
])

# groupByKey: Group values by key (⚠️ Expensive - causes shuffle)
grouped = pairs.groupByKey()
print("Grouped by key:")
for key, values in grouped.collect():
    print(f"  {key}: {list(values)}")

# reduceByKey: Better than groupByKey - aggregates locally first
sum_by_key = pairs.reduceByKey(lambda a, b: a + b)
print(f"Sum by key: {sum_by_key.collect()}")

max_by_key = pairs.reduceByKey(lambda a, b: max(a, b))
print(f"Max by key: {max_by_key.collect()}")

# mapValues: Apply function to values only
doubled_values = pairs.mapValues(lambda x: x * 2)
print(f"Doubled values: {doubled_values.collect()}")

# keys and values: Extract keys or values
keys_only = pairs.keys()
values_only = pairs.values()
print(f"Keys: {keys_only.collect()}")
print(f"Values: {values_only.collect()}")

# sortByKey: Sort by keys
sorted_pairs = pairs.sortByKey()
print(f"Sorted by key: {sorted_pairs.collect()}")

# sortBy: Sort by custom function
sorted_by_value = pairs.sortBy(lambda x: x[1])  # Sort by value
print(f"Sorted by value: {sorted_by_value.collect()}")

# Join operations
prices = sc.parallelize([("apple", 1.2), ("banana", 0.8), ("orange", 1.5)])
quantities = sc.parallelize([("apple", 10), ("banana", 15), ("orange", 8)])

# Inner join
joined = prices.join(quantities)
print("Price-Quantity join:")
for item, (price, qty) in joined.collect():
    print(f"  {item}: ${price} x {qty} = ${price * qty}")

# Left outer join
left_joined = prices.leftOuterJoin(quantities)
print("Left outer join:")
for item, (price, qty) in left_joined.collect():
    qty_str = str(qty) if qty is not None else "N/A"
    print(f"  {item}: ${price} x {qty_str}")


=== KEY-VALUE TRANSFORMATIONS ===
Grouped by key:
  apple: [5, 2, 3]
  banana: [3, 1]
  orange: [4]
Sum by key: [('apple', 10), ('banana', 4), ('orange', 4)]
Max by key: [('apple', 5), ('banana', 3), ('orange', 4)]
Doubled values: [('apple', 10), ('banana', 6), ('apple', 4), ('orange', 8), ('banana', 2), ('apple', 6)]
Keys: ['apple', 'banana', 'apple', 'orange', 'banana', 'apple']
Values: [5, 3, 2, 4, 1, 3]
Sorted by key: [('apple', 5), ('apple', 2), ('apple', 3), ('banana', 3), ('banana', 1), ('orange', 4)]
Sorted by value: [('banana', 1), ('apple', 2), ('banana', 3), ('apple', 3), ('orange', 4), ('apple', 5)]
Price-Quantity join:
  apple: $1.2 x 10 = $12.0
  banana: $0.8 x 15 = $12.0
  orange: $1.5 x 8 = $12.0
Left outer join:
  apple: $1.2 x 10
  banana: $0.8 x 15
  orange: $1.5 x 8


8.Partitioning Operations

In [None]:
print("\n=== PARTITIONING OPERATIONS ===")

data = sc.parallelize(list(range(1, 11)), 4)
print(f"Original partitions: {data.getNumPartitions()}")
print(f"Elements per partition: {data.glom().collect()}")

# coalesce: Reduce partitions without full shuffle
coalesced = data.coalesce(2)
print(f"After coalesce: {coalesced.getNumPartitions()}")
print(f"Elements per partition: {coalesced.glom().collect()}")

# repartition: Change partitions with full shuffle
repartitioned = data.repartition(6)
print(f"After repartition: {repartitioned.getNumPartitions()}")

# partitionBy: Custom partitioning for key-value RDDs
key_value = sc.parallelize([(i, i*2) for i in range(10)])
partitioned_kv = key_value.partitionBy(3, lambda k: k % 3)
print(f"Custom partitioned: {partitioned_kv.glom().collect()}")


=== PARTITIONING OPERATIONS ===
Original partitions: 4
Elements per partition: [[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]
After coalesce: 2
Elements per partition: [[1, 2, 3, 4], [5, 6, 7, 8, 9, 10]]
After repartition: 6
Custom partitioned: [[(0, 0), (3, 6), (6, 12), (9, 18)], [(1, 2), (4, 8), (7, 14)], [(2, 4), (5, 10), (8, 16)]]


9.Sampling Operations

In [None]:
print("\n=== SAMPLING OPERATIONS ===")

large_numbers = sc.parallelize(range(1, 101))

# sample: Random sampling
sampled = large_numbers.sample(withReplacement=False, fraction=0.1, seed=42)
print(f"Sample (10%): {sorted(sampled.collect())}")

# takeSample: Take exact number of samples
exact_sample = large_numbers.takeSample(withReplacement=False, num=5, seed=42)
print(f"Exact 5 samples: {sorted(exact_sample)}")

# sampleByKey: Sample by key for key-value RDDs
kv_data = sc.parallelize([("A", i) for i in range(20)] + [("B", i) for i in range(20)])
sampled_by_key = kv_data.sampleByKey(withReplacement=False, fractions={"A": 0.3, "B": 0.2})
print("Sampled by key:")
for key, value in sampled_by_key.collect():
    print(f"  {key}: {value}")


=== SAMPLING OPERATIONS ===
Sample (10%): [1, 4, 11, 18, 19, 33, 36, 57, 68, 71, 74, 80, 89, 95]
Exact 5 samples: [5, 34, 35, 60, 74]
Sampled by key:
  A: 2
  A: 4
  A: 13
  A: 16
  A: 19
  B: 6
  B: 9
  B: 14
  B: 15


# **ACTION OPERATIONS**

1.count() - Count Elements

In [None]:
print("\n=== COUNT ACTION ===")

numbers = sc.parallelize([1, 2, 3, 4, 5])
words = sc.parallelize(["spark", "python", "big", "data"])

print(f"Numbers count: {numbers.count()}")
print(f"Words count: {words.count()}")
print(f"Even numbers count: {numbers.filter(lambda x: x % 2 == 0).count()}")

# countApprox: Approximate count for large datasets
large_data = sc.parallelize(range(1000000))
approx_count = large_data.countApprox(timeout=1000, confidence=0.95)
print(f"Approximate count: {approx_count}")


=== COUNT ACTION ===
Numbers count: 5
Words count: 4
Even numbers count: 2
Approximate count: 1000000


2.collect() - Bring All Data to Driver

Use with caution for large datasets!

In [None]:
print("\n=== COLLECT ACTION ===")

small = sc.parallelize([1, 2, 3, 4, 5])
result = small.collect()
print(f"Collected: {result}")
print(f"Type: {type(result)}")

# Don't do this with large datasets!
# large = sc.parallelize(range(1000000))
# bad_idea = large.collect()  # This could crash your driver!

# Better alternatives for large datasets
print("Better alternatives:")
print(f"First 10 elements: {small.take(10)}")
print(f"Sample of elements: {small.sample(False, 0.5).collect()}")


=== COLLECT ACTION ===
Collected: [1, 2, 3, 4, 5]
Type: <class 'list'>
Better alternatives:
First 10 elements: [1, 2, 3, 4, 5]
Sample of elements: [1, 2, 3, 4, 5]


3.take(n) - Get First N Elements

In [None]:
print("\n=== TAKE ACTION ===")

numbers = sc.parallelize([10, 5, 8, 3, 12, 7, 1, 9])

first_3 = numbers.take(3)
first_5 = numbers.take(5)

print(f"Original (showing first 8): {numbers.take(8)}")
print(f"First 3: {first_3}")
print(f"First 5: {first_5}")

# takeOrdered: Take smallest n elements
smallest_3 = numbers.takeOrdered(3)
print(f"Smallest 3: {smallest_3}")

# takeOrdered with custom key
words = sc.parallelize(["spark", "python", "apache", "big", "data"])
shortest_3 = words.takeOrdered(3, key=len)
print(f"Shortest 3 words: {shortest_3}")


=== TAKE ACTION ===
Original (showing first 8): [10, 5, 8, 3, 12, 7, 1, 9]
First 3: [10, 5, 8]
First 5: [10, 5, 8, 3, 12]
Smallest 3: [1, 3, 5]
Shortest 3 words: ['big', 'data', 'spark']


4.top(n) - Get Top N Elements

In [None]:
print("\n=== TOP ACTION ===")

numbers = sc.parallelize([10, 5, 8, 3, 12, 7, 1, 9])
top_3 = numbers.top(3)
top_5 = numbers.top(5)

print(f"Top 3: {top_3}")
print(f"Top 5: {top_5}")

# Custom ordering for complex data
people = sc.parallelize([
    {"name": "Alice", "age": 25},
    {"name": "Bob", "age": 30},
    {"name": "Charlie", "age": 35},
    {"name": "Diana", "age": 20}
])

oldest_2 = people.top(2, key=lambda person: person["age"])
print(f"Oldest 2: {[f'{p['name']}({p['age']})' for p in oldest_2]}")

# Longest words
words = sc.parallelize(["spark", "python", "apache", "big", "data", "processing"])
longest_3 = words.top(3, key=len)
print(f"Longest 3 words: {longest_3}")


=== TOP ACTION ===
Top 3: [12, 10, 9]
Top 5: [12, 10, 9, 8, 7]
Oldest 2: ['Charlie(35)', 'Bob(30)']
Longest 3 words: ['processing', 'python', 'apache']


5.countByValue() - Frequency Count

In [None]:
print("\n=== COUNTBYVALUE ACTION ===")

letters = sc.parallelize(["a", "b", "a", "c", "b", "a", "d"])
frequencies = letters.countByValue()

print("Letter frequencies:")
for letter, count in sorted(frequencies.items()):
    print(f"  '{letter}': {count} times")

# With numbers
numbers = sc.parallelize([1, 2, 1, 3, 2, 1, 4, 2])
num_freq = numbers.countByValue()
print("Number frequencies:")
for num in sorted(num_freq.keys()):
    print(f"  {num}: {num_freq[num]} times")

# With complex data
grades = sc.parallelize(["A", "B", "A", "C", "B", "A", "B", "C", "A"])
grade_counts = grades.countByValue()
print("Grade distribution:")
for grade in sorted(grade_counts.keys()):
    print(f"  {grade}: {grade_counts[grade]} students")


=== COUNTBYVALUE ACTION ===
Letter frequencies:
  'a': 3 times
  'b': 2 times
  'c': 1 times
  'd': 1 times
Number frequencies:
  1: 3 times
  2: 3 times
  3: 1 times
  4: 1 times
Grade distribution:
  A: 4 students
  B: 3 students
  C: 2 students


6.reduce(func) - Combine Elements

In [None]:
print("\n=== REDUCE ACTION ===")

numbers = sc.parallelize([1, 2, 3, 4, 5])

# Sum all elements
total_sum = numbers.reduce(lambda a, b: a + b)
print(f"Sum: {total_sum}")

# Find maximum
maximum = numbers.reduce(lambda a, b: max(a, b))
print(f"Maximum: {maximum}")

# Find minimum
minimum = numbers.reduce(lambda a, b: min(a, b))
print(f"Minimum: {minimum}")

# String concatenation
words = sc.parallelize(["Hello", "World", "Spark", "Python"])
concatenated = words.reduce(lambda a, b: a + " " + b)
print(f"Concatenated: {concatenated}")

# Product of all numbers
product = numbers.reduce(lambda a, b: a * b)
print(f"Product: {product}")

# Find longest string
long_words = sc.parallelize(["spark", "python", "apache", "data", "processing"])
longest = long_words.reduce(lambda a, b: a if len(a) > len(b) else b)
print(f"Longest word: {longest}")


=== REDUCE ACTION ===
Sum: 15
Maximum: 5
Minimum: 1
Concatenated: Hello World Spark Python
Product: 120
Longest word: processing


7.fold(zeroValue, func) - Reduce with Initial Value

In [None]:
print("\n=== FOLD ACTION ===")

numbers = sc.parallelize([1, 2, 3, 4, 5])

# Sum with initial value
sum_with_init = numbers.fold(10, lambda a, b: a + b)
print(f"Sum with initial value 10: {sum_with_init}")

# Product with initial value
product = numbers.fold(1, lambda a, b: a * b)
print(f"Product: {product}")

# String folding
words = sc.parallelize(["Spark", "is", "fast"])
sentence = words.fold("Apache", lambda a, b: a + " " + b)
print(f"Sentence: {sentence}")

# Finding maximum with initial value
max_with_init = numbers.fold(0, lambda a, b: max(a, b))
print(f"Max with initial 0: {max_with_init}")

# Building a set ,Use aggregate() instead
unique_letters = sc.parallelize(["a", "b", "c", "a", "b"])
letter_set = unique_letters.aggregate(
    set(),                                    # Initial accumulator
    lambda acc, x: acc.union({x}),           # Combine element with accumulator
    lambda acc1, acc2: acc1.union(acc2)      # Combine two accumulators
)
print(f"Unique letters: {letter_set}")

#: Use distinct()
unique_letters = sc.parallelize(["a", "b", "c", "a", "b"])
result = set(unique_letters.distinct().collect())
print(f"Unique letters: {result}")

# Fix fold() by making types consistent
unique_letters = sc.parallelize(["a", "b", "c", "a", "b"])
letter_sets = unique_letters.map(lambda x: {x})  # Convert strings to sets first
combined_set = letter_sets.fold(set(), lambda acc, x: acc.union(x))
print(f"Unique letters: {combined_set}")




=== FOLD ACTION ===
Sum with initial value 10: 45
Product: 120
Sentence: Apache Apache Spark Apache is fast
Max with initial 0: 5
Unique letters: {'a', 'b', 'c'}
Unique letters: {'b', 'a', 'c'}
Unique letters: {'a', 'b', 'c'}


8.aggregate() - Flexible Aggregation

In [None]:
print("\n=== AGGREGATE ACTION ===")

numbers = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# Calculate sum and count in one pass
def seq_op(acc, value):
    # acc is (sum, count)
    return (acc[0] + value, acc[1] + 1)

def comb_op(acc1, acc2):
    # Combine two accumulators
    return (acc1[0] + acc2[0], acc1[1] + acc2[1])

sum_count = numbers.aggregate((0, 0), seq_op, comb_op)
total_sum, count = sum_count
average = total_sum / count if count > 0 else 0

print(f"Sum: {total_sum}, Count: {count}, Average: {average}")

# More complex aggregation - statistics
def stats_seq_op(stats, value):
    # stats is [sum, count, max, min]
    return [
        stats[0] + value,          # sum
        stats[1] + 1,              # count
        max(stats[2], value),      # max
        min(stats[3], value)       # min
    ]

def stats_comb_op(stats1, stats2):
    return [
        stats1[0] + stats2[0],            # sum
        stats1[1] + stats2[1],            # count
        max(stats1[2], stats2[2]),        # max
        min(stats1[3], stats2[3])         # min
    ]

import sys
initial_stats = [0, 0, -sys.maxsize, sys.maxsize]
final_stats = numbers.aggregate(initial_stats, stats_seq_op, stats_comb_op)

print(f"Advanced Stats:")
print(f"  Sum: {final_stats[0]}")
print(f"  Count: {final_stats[1]}")
print(f"  Max: {final_stats[2]}")
print(f"  Min: {final_stats[3]}")
print(f"  Average: {final_stats[0] / final_stats[1]}")

# Text analysis example
text_data = sc.parallelize([
    "hello world", "spark is great", "python rocks",
    "big data analytics", "machine learning"
])

def text_seq_op(acc, sentence):
    words = sentence.split()
    return {
        'total_chars': acc['total_chars'] + len(sentence),
        'total_words': acc['total_words'] + len(words),
        'sentence_count': acc['sentence_count'] + 1,
        'longest_word': max(acc['longest_word'], max(words, key=len), key=len)
    }

def text_comb_op(acc1, acc2):
    return {
        'total_chars': acc1['total_chars'] + acc2['total_chars'],
        'total_words': acc1['total_words'] + acc2['total_words'],
        'sentence_count': acc1['sentence_count'] + acc2['sentence_count'],
        'longest_word': max(acc1['longest_word'], acc2['longest_word'], key=len)
    }

text_stats = text_data.aggregate(
    {'total_chars': 0, 'total_words': 0, 'sentence_count': 0, 'longest_word': ''},
    text_seq_op,
    text_comb_op
)

print("Text Statistics:")
print(f"  Total characters: {text_stats['total_chars']}")
print(f"  Total words: {text_stats['total_words']}")
print(f"  Sentences: {text_stats['sentence_count']}")
print(f"  Average words per sentence: {text_stats['total_words'] / text_stats['sentence_count']:.1f}")
print(f"  Longest word: '{text_stats['longest_word']}'")


=== AGGREGATE ACTION ===
Sum: 55, Count: 10, Average: 5.5
Advanced Stats:
  Sum: 55
  Count: 10
  Max: 10
  Min: 1
  Average: 5.5
Text Statistics:
  Total characters: 71
  Total words: 12
  Sentences: 5
  Average words per sentence: 2.4
  Longest word: 'analytics'


9.foreach(func) - Side Effects

In [None]:
print("\n=== FOREACH ACTION ===")

numbers = sc.parallelize([1, 2, 3, 4, 5])

# Simple printing (Note: output might not appear in order due to distributed nature)
print("Printing each element:")
numbers.foreach(lambda x: print(f"  Value: {x}"))

# Collecting data in a shared variable (use with caution)
collected_data = []

def collect_data(value):
    # In real distributed environment, this won't work as expected
    # Each executor has its own copy of the list
    collected_data.append(value * 2)

numbers.foreach(collect_data)
print(f"Collected data (local only): {collected_data}")

# Better approach: use foreachPartition for batch operations
def process_partition(partition):
    batch = list(partition)
    print(f"Processing partition with {len(batch)} elements: {batch}")
    # Simulate batch processing (e.g., database insert)
    for value in batch:
        # Simulate some processing
        pass

numbers.foreachPartition(process_partition)

# Practical example: simulating writes to external system
def write_to_external_system(partition):
    batch = list(partition)
    if batch:
        print(f"Writing batch of {len(batch)} elements to external system")
        # Simulate writing to database, file, etc.
        for value in batch:
            print(f"  Writing {value}")

numbers.foreachPartition(write_to_external_system)


=== FOREACH ACTION ===
Printing each element:
Collected data (local only): []


10.Additional Useful Actions

In [None]:
print("\n=== ADDITIONAL ACTIONS ===")

numbers = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
key_value = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("c", 1)])

# first: Get first element
print(f"First element: {numbers.first()}")

# isEmpty: Check if RDD is empty
print(f"Is empty: {numbers.isEmpty()}")
empty_rdd = sc.parallelize([])
print(f"Empty RDD is empty: {empty_rdd.isEmpty()}")

# sample: Random sampling (this returns an RDD, different from takeSample)
sampled_rdd = numbers.sample(withReplacement=False, fraction=0.3, seed=42)
print(f"Sampled elements: {sampled_rdd.collect()}")

# countByKey: Count elements per key
key_value_counts = key_value.countByKey()
print("Counts by key:")
for key, count in sorted(key_value_counts.items()):
    print(f"  Key '{key}': {count} times")

# glom: Convert each partition to array
partitioned = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
partition_arrays = partitioned.glom()
print("Partition contents:")
for i, partition in enumerate(partition_arrays.collect()):
    print(f"  Partition {i}: {list(partition)}")

# lookup: Get values for specific keys (only for key-value RDDs)
kv_rdd = sc.parallelize([("apple", 5), ("banana", 3), ("apple", 2), ("cherry", 8)])
apple_values = kv_rdd.lookup("apple")
print(f"Values for 'apple': {apple_values}")

# min and max
print(f"Minimum: {numbers.min()}")
print(f"Maximum: {numbers.max()}")

# variance and stdev (statistical functions)
print(f"Variance: {numbers.variance()}")
print(f"Standard deviation: {numbers.stdev()}")
print(f"Mean: {numbers.mean()}")

# histogram: Get histogram of values
hist_data = sc.parallelize([1, 1, 2, 3, 3, 3, 4, 5, 5, 6, 7, 8, 9, 10])
histogram = hist_data.histogram(5)  # 5 buckets
bucket_boundaries, counts = histogram
print("Histogram:")
for i in range(len(counts)):
    print(f"  [{bucket_boundaries[i]:.1f}, {bucket_boundaries[i+1]:.1f}): {counts[i]}")

# stats: Get comprehensive statistics
stats = numbers.stats()
print(f"Stats: count={stats.count()}, mean={stats.mean():.2f}, stdev={stats.stdev():.2f}")


=== ADDITIONAL ACTIONS ===
First element: 1
Is empty: False
Empty RDD is empty: True
Sampled elements: [1, 2, 4, 5]
Counts by key:
  Key 'a': 2 times
  Key 'b': 1 times
  Key 'c': 1 times
Partition contents:
  Partition 0: [1, 2]
  Partition 1: [3, 4]
  Partition 2: [5, 6]
Values for 'apple': [5, 2]
Minimum: 1
Maximum: 10
Variance: 8.25
Standard deviation: 2.8722813232690143
Mean: 5.5
Histogram:
  [1.0, 2.8): 3
  [2.8, 4.6): 4
  [4.6, 6.4): 3
  [6.4, 8.2): 2
  [8.2, 10.0): 2
Stats: count=10, mean=5.50, stdev=2.87


# **Word Count**

In [None]:
print("\n=== PATTERN: WORD COUNT ===")

# Create sample text data
text_lines = sc.parallelize([
    "Apache Spark is fast",
    "Spark can process big data",
    "Big data processing with Spark",
    "Python and Spark work great together",
    "PySpark makes big data easy"
])

# Classic word count implementation
word_counts = (text_lines
    .flatMap(lambda line: line.lower().split())  # Split into words
    .filter(lambda word: word.isalpha())         # Remove non-alphabetic
    .map(lambda word: (word, 1))                 # Create (word, 1) pairs
    .reduceByKey(lambda a, b: a + b)             # Count occurrences
    .sortBy(lambda x: x[1], ascending=False))    # Sort by count descending

print("Word counts:")
for word, count in word_counts.collect():
    print(f"  '{word}': {count}")

# More advanced word count with filtering
advanced_word_counts = (text_lines
    .flatMap(lambda line: line.lower().split())
    .filter(lambda word: len(word) > 3)          # Only words longer than 3 chars
    .filter(lambda word: word.isalpha())         # Only alphabetic words
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
    .filter(lambda x: x[1] >= 2)                 # Only words appearing 2+ times
    .sortBy(lambda x: x[1], ascending=False))

print("\nAdvanced word counts (len>3, freq>=2):")
for word, count in advanced_word_counts.collect():
    print(f"  '{word}': {count}")


=== PATTERN: WORD COUNT ===
Word counts:
  'spark': 4
  'big': 3
  'data': 3
  'apache': 1
  'fast': 1
  'with': 1
  'python': 1
  'and': 1
  'work': 1
  'easy': 1
  'is': 1
  'can': 1
  'process': 1
  'processing': 1
  'great': 1
  'together': 1
  'pyspark': 1
  'makes': 1

Advanced word counts (len>3, freq>=2):
  'spark': 4
  'data': 3


# **Data Processing Pipeline**

In [None]:
print("\n=== PATTERN: DATA PROCESSING PIPELINE ===")

# Sample sales data
sales_data = sc.parallelize([
    {"product": "laptop", "amount": 1200.0, "region": "north", "salesperson": "Alice"},
    {"product": "phone", "amount": 800.0, "region": "south", "salesperson": "Bob"},
    {"product": "laptop", "amount": 1300.0, "region": "north", "salesperson": "Charlie"},
    {"product": "tablet", "amount": 600.0, "region": "east", "salesperson": "Diana"},
    {"product": "phone", "amount": 750.0, "region": "south", "salesperson": "Alice"},
    {"product": "laptop", "amount": 1100.0, "region": "west", "salesperson": "Bob"},
    {"product": "monitor", "amount": 400.0, "region": "north", "salesperson": "Charlie"}
])

# Processing pipeline
result = (sales_data
    .filter(lambda sale: sale["amount"] > 700)                    # High-value sales only
    .map(lambda sale: (sale["region"], sale["amount"]))           # Extract region and amount
    .reduceByKey(lambda a, b: a + b)                             # Sum by region
    .sortBy(lambda x: x[1], ascending=False))                     # Sort by total descending

print("High-value sales by region (>$700):")
for region, total in result.collect():
    print(f"  {region}: ${total:,.2f}")

# More complex pipeline: product analysis
product_analysis = (sales_data
    .map(lambda sale: (sale["product"], (sale["amount"], 1)))     # (product, (amount, count))
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))        # Sum amounts and counts
    .mapValues(lambda x: {
        "total_sales": x[0],
        "count": x[1],
        "average": x[0] / x[1]
    })
    .sortBy(lambda x: x[1]["total_sales"], ascending=False))

print("\nProduct analysis:")
for product, stats in product_analysis.collect():
    print(f"  {product}:")
    print(f"    Total sales: ${stats['total_sales']:,.2f}")
    print(f"    Units sold: {stats['count']}")
    print(f"    Average price: ${stats['average']:,.2f}")

# Salesperson performance
salesperson_performance = (sales_data
    .map(lambda sale: (sale["salesperson"], sale["amount"]))
    .groupByKey()
    .mapValues(lambda amounts: {
        "sales": list(amounts),
        "total": sum(amounts),
        "count": len(list(amounts)),
        "average": sum(amounts) / len(list(amounts))
    })
    .sortBy(lambda x: x[1]["total"], ascending=False))

print("\nSalesperson performance:")
for person, stats in salesperson_performance.collect():
    print(f"  {person}:")
    print(f"    Total: ${stats['total']:,.2f}")
    print(f"    Transactions: {stats['count']}")
    print(f"    Average: ${stats['average']:,.2f}")


=== PATTERN: DATA PROCESSING PIPELINE ===
High-value sales by region (>$700):
  north: $2,500.00
  south: $1,550.00
  west: $1,100.00

Product analysis:
  laptop:
    Total sales: $3,600.00
    Units sold: 3
    Average price: $1,200.00
  phone:
    Total sales: $1,550.00
    Units sold: 2
    Average price: $775.00
  tablet:
    Total sales: $600.00
    Units sold: 1
    Average price: $600.00
  monitor:
    Total sales: $400.00
    Units sold: 1
    Average price: $400.00

Salesperson performance:
  Alice:
    Total: $1,950.00
    Transactions: 2
    Average: $975.00
  Bob:
    Total: $1,900.00
    Transactions: 2
    Average: $950.00
  Charlie:
    Total: $1,700.00
    Transactions: 2
    Average: $850.00
  Diana:
    Total: $600.00
    Transactions: 1
    Average: $600.00


# **Join Operations Pattern**

In [None]:
print("\n=== PATTERN: JOIN OPERATIONS ===")

# Sample data
customers = sc.parallelize([
    (1, {"name": "Alice", "city": "New York"}),
    (2, {"name": "Bob", "city": "San Francisco"}),
    (3, {"name": "Charlie", "city": "Chicago"}),
    (4, {"name": "Diana", "city": "Boston"})
])

orders = sc.parallelize([
    (1, {"product": "laptop", "amount": 1200}),
    (2, {"product": "phone", "amount": 800}),
    (1, {"product": "tablet", "amount": 600}),
    (3, {"product": "laptop", "amount": 1300}),
    (5, {"product": "monitor", "amount": 400})  # Customer 5 doesn't exist
])

products = sc.parallelize([
    ("laptop", {"category": "electronics", "weight": 2.5}),
    ("phone", {"category": "electronics", "weight": 0.2}),
    ("tablet", {"category": "electronics", "weight": 0.5}),
    ("monitor", {"category": "electronics", "weight": 3.0})
])

# Inner join: customers with orders
customer_orders = customers.join(orders)
print("Customer orders (inner join):")
for customer_id, (customer_info, order_info) in customer_orders.collect():
    print(f"  {customer_info['name']} from {customer_info['city']} ordered {order_info['product']}")

# Left outer join: all customers (with optional orders)
all_customers = customers.leftOuterJoin(orders)
print("\nAll customers with their orders (left outer join):")
for customer_id, (customer_info, order_info) in all_customers.collect():
    if order_info:
        print(f"  {customer_info['name']}: {order_info['product']}")
    else:
        print(f"  {customer_info['name']}: No orders")

# Complex join: customer orders with product details
# First, create order-product pairs
order_products = orders.map(lambda x: (x[1]["product"], (x[0], x[1])))  # (product, (customer_id, order))

# Join with product information
detailed_orders = order_products.join(products)
print("\nDetailed order information:")
for product, ((customer_id, order), product_info) in detailed_orders.collect():
    print(f"  Product: {product}, Customer ID: {customer_id}")
    print(f"    Amount: ${order['amount']}, Category: {product_info['category']}")

# Three-way join simulation
# Get customer name for each order with product details
customer_name_map = customers.collectAsMap()  # Small dataset, safe to collect

detailed_with_names = detailed_orders.map(
    lambda x: {
        "customer": customer_name_map.get(x[1][0][0], "Unknown"),
        "product": x[0],
        "amount": x[1][0][1]["amount"],
        "category": x[1][1]["category"],
        "weight": x[1][1]["weight"]
    }
)

print("\nComplete order details:")
for order in detailed_with_names.collect():
    print(f"  {order['customer']} bought {order['product']} (${order['amount']}) - {order['weight']}kg")


=== PATTERN: JOIN OPERATIONS ===
Customer orders (inner join):
  Alice from New York ordered laptop
  Alice from New York ordered tablet
  Bob from San Francisco ordered phone
  Charlie from Chicago ordered laptop

All customers with their orders (left outer join):
  Diana: No orders
  Alice: laptop
  Alice: tablet
  Bob: phone
  Charlie: laptop

Detailed order information:
  Product: laptop, Customer ID: 1
    Amount: $1200, Category: electronics
  Product: laptop, Customer ID: 3
    Amount: $1300, Category: electronics
  Product: phone, Customer ID: 2
    Amount: $800, Category: electronics
  Product: tablet, Customer ID: 1
    Amount: $600, Category: electronics
  Product: monitor, Customer ID: 5
    Amount: $400, Category: electronics

Complete order details:
  {'name': 'Alice', 'city': 'New York'} bought laptop ($1200) - 2.5kg
  {'name': 'Charlie', 'city': 'Chicago'} bought laptop ($1300) - 2.5kg
  {'name': 'Bob', 'city': 'San Francisco'} bought phone ($800) - 0.2kg
  {'name': 'A

# **Advanced Aggregation Pattern**

In [None]:
print("\n=== PATTERN: ADVANCED AGGREGATION ===")

# Sample transaction data
transactions = sc.parallelize([
    {"user_id": 1, "amount": 100.0, "category": "food", "timestamp": "2023-01-01"},
    {"user_id": 1, "amount": 50.0, "category": "transport", "timestamp": "2023-01-02"},
    {"user_id": 2, "amount": 200.0, "category": "food", "timestamp": "2023-01-01"},
    {"user_id": 1, "amount": 30.0, "category": "food", "timestamp": "2023-01-03"},
    {"user_id": 3, "amount": 150.0, "category": "entertainment", "timestamp": "2023-01-01"},
    {"user_id": 2, "amount": 80.0, "category": "transport", "timestamp": "2023-01-02"},
    {"user_id": 3, "amount": 25.0, "category": "food", "timestamp": "2023-01-03"},
    {"user_id": 1, "amount": 300.0, "category": "shopping", "timestamp": "2023-01-04"}
])

# User spending summary using aggregateByKey
def create_summary(amount, category):
    return {
        "total": amount,
        "count": 1,
        "categories": {category},
        "transactions": [amount]
    }

def merge_summaries(summary1, summary2):
    return {
        "total": summary1["total"] + summary2["total"],
        "count": summary1["count"] + summary2["count"],
        "categories": summary1["categories"].union(summary2["categories"]),
        "transactions": summary1["transactions"] + summary2["transactions"]
    }

def combine_values(summary, transaction):
    return {
        "total": summary["total"] + transaction["amount"],
        "count": summary["count"] + 1,
        "categories": summary["categories"].union({transaction["category"]}),
        "transactions": summary["transactions"] + [transaction["amount"]]
    }

# Create user summaries
user_summaries = (transactions
    .map(lambda t: (t["user_id"], t))
    .aggregateByKey(
        {"total": 0, "count": 0, "categories": set(), "transactions": []},
        lambda summary, transaction: combine_values(summary, transaction),
        lambda s1, s2: merge_summaries(s1, s2)
    ))

print("User spending summaries:")
for user_id, summary in sorted(user_summaries.collect()):
    avg_transaction = summary["total"] / summary["count"]
    max_transaction = max(summary["transactions"])
    min_transaction = min(summary["transactions"])

    print(f"  User {user_id}:")
    print(f"    Total spent: ${summary['total']:.2f}")
    print(f"    Transactions: {summary['count']}")
    print(f"    Average transaction: ${avg_transaction:.2f}")
    print(f"    Max transaction: ${max_transaction:.2f}")
    print(f"    Min transaction: ${min_transaction:.2f}")
    print(f"    Categories: {', '.join(summary['categories'])}")

# Category analysis across all users
category_analysis = (transactions
    .map(lambda t: (t["category"], t["amount"]))
    .aggregateByKey(
        {"total": 0, "count": 0, "amounts": []},
        lambda acc, amount: {
            "total": acc["total"] + amount,
            "count": acc["count"] + 1,
            "amounts": acc["amounts"] + [amount]
        },
        lambda acc1, acc2: {
            "total": acc1["total"] + acc2["total"],
            "count": acc1["count"] + acc2["count"],
            "amounts": acc1["amounts"] + acc2["amounts"]
        }
    )
    .mapValues(lambda stats: {
        **stats,
        "average": stats["total"] / stats["count"],
        "max": max(stats["amounts"]),
        "min": min(stats["amounts"])
    }))

print("\nCategory analysis:")
for category, stats in category_analysis.collect():
    print(f"  {category}:")
    print(f"    Total: ${stats['total']:.2f}")
    print(f"    Count: {stats['count']}")
    print(f"    Average: ${stats['average']:.2f}")
    print(f"    Range: ${stats['min']:.2f} - ${stats['max']:.2f}")


=== PATTERN: ADVANCED AGGREGATION ===
User spending summaries:
  User 1:
    Total spent: $480.00
    Transactions: 4
    Average transaction: $120.00
    Max transaction: $300.00
    Min transaction: $30.00
    Categories: shopping, transport, food
  User 2:
    Total spent: $280.00
    Transactions: 2
    Average transaction: $140.00
    Max transaction: $200.00
    Min transaction: $80.00
    Categories: transport, food
  User 3:
    Total spent: $175.00
    Transactions: 2
    Average transaction: $87.50
    Max transaction: $150.00
    Min transaction: $25.00
    Categories: entertainment, food

Category analysis:
  food:
    Total: $355.00
    Count: 4
    Average: $88.75
    Range: $25.00 - $200.00
  transport:
    Total: $130.00
    Count: 2
    Average: $65.00
    Range: $50.00 - $80.00
  entertainment:
    Total: $150.00
    Count: 1
    Average: $150.00
    Range: $150.00 - $150.00
  shopping:
    Total: $300.00
    Count: 1
    Average: $300.00
    Range: $300.00 - $300.00

# **Time Series Analysis Pattern**

In [None]:
print("\n=== PATTERN: TIME SERIES ANALYSIS ===")

# Sample time series data
time_series_data = sc.parallelize([
    {"date": "2023-01-01", "value": 100, "metric": "sales"},
    {"date": "2023-01-02", "value": 120, "metric": "sales"},
    {"date": "2023-01-03", "value": 90, "metric": "sales"},
    {"date": "2023-01-04", "value": 150, "metric": "sales"},
    {"date": "2023-01-05", "value": 200, "metric": "sales"},
    {"date": "2023-01-01", "value": 50, "metric": "visits"},
    {"date": "2023-01-02", "value": 60, "metric": "visits"},
    {"date": "2023-01-03", "value": 45, "metric": "visits"},
    {"date": "2023-01-04", "value": 75, "metric": "visits"},
    {"date": "2023-01-05", "value": 100, "metric": "visits"}
])

# Daily aggregation
daily_totals = (time_series_data
    .map(lambda record: (record["date"], record["value"]))
    .reduceByKey(lambda a, b: a + b)
    .sortByKey())

print("Daily totals:")
for date, total in daily_totals.collect():
    print(f"  {date}: {total}")

# Metric-wise analysis
metric_analysis = (time_series_data
    .map(lambda record: (record["metric"], record["value"]))
    .groupByKey()
    .mapValues(lambda values: {
        "values": list(values),
        "total": sum(values),
        "count": len(list(values)),
        "average": sum(values) / len(list(values)),
        "trend": "increasing" if list(values)[-1] > list(values)[0] else "decreasing"
    }))

print("\nMetric analysis:")
for metric, stats in metric_analysis.collect():
    print(f"  {metric}:")
    print(f"    Total: {stats['total']}")
    print(f"    Average: {stats['average']:.1f}")
    print(f"    Trend: {stats['trend']}")
    print(f"    Values: {stats['values']}")

# Moving averages (simplified 3-day window)
def calculate_moving_average(partition):
    data = sorted(list(partition), key=lambda x: x["date"])
    result = []

    for i in range(len(data)):
        if i >= 2:  # 3-day window
            window_sum = sum(data[j]["value"] for j in range(i-2, i+1))
            moving_avg = window_sum / 3
            result.append({
                "date": data[i]["date"],
                "value": data[i]["value"],
                "moving_avg": moving_avg
            })

    return result

sales_data = time_series_data.filter(lambda x: x["metric"] == "sales")
moving_averages = sales_data.mapPartitions(calculate_moving_average)

print("\nSales with 3-day moving average:")
for record in moving_averages.collect():
    print(f"  {record['date']}: Value={record['value']}, MA={record['moving_avg']:.1f}")


=== PATTERN: TIME SERIES ANALYSIS ===
Daily totals:
  2023-01-01: 150
  2023-01-02: 180
  2023-01-03: 135
  2023-01-04: 225
  2023-01-05: 300

Metric analysis:
  visits:
    Total: 330
    Average: 66.0
    Trend: increasing
    Values: [50, 60, 45, 75, 100]
  sales:
    Total: 660
    Average: 132.0
    Trend: increasing
    Values: [100, 120, 90, 150, 200]

Sales with 3-day moving average:
  2023-01-03: Value=90, MA=103.3
  2023-01-04: Value=150, MA=120.0
  2023-01-05: Value=200, MA=146.7


📚 Key Takeaways:

1. **Transformations are lazy** - They build up a computation plan
2. **Actions are eager** - They trigger execution of the entire plan  
3. **RDDs are immutable** - Operations create new RDDs
4. **Use appropriate operations** - reduceByKey vs groupByKey, coalesce vs repartition
5. **Cache frequently used RDDs** - Avoid recomputation
6. **Be careful with collect()** - Can crash driver with large datasets

🚀 Performance Best Practices:

- Prefer `reduceByKey` over `groupByKey`
- Use `mapPartitions` for expensive setup operations  
- Cache RDDs that are used multiple times
- Use appropriate number of partitions
- Avoid unnecessary shuffles
- Use built-in functions when available

🎯 Common Operations by Use Case:

| Use Case | Operations |
|----------|------------|
| Data Cleaning | filter, map, distinct |
| Aggregation | reduceByKey, groupByKey, aggregate |
| Transformation | map, flatMap, mapPartitions |
| Analysis | count, countByValue, top, take |
| Joining | join, leftOuterJoin, rightOuterJoin |


In [None]:
# Memory usage monitoring
import psutil
print(f"Memory usage: {psutil.virtual_memory().percent}%")

# Display DataFrames nicely
def show_rdd_sample(rdd, n=5):
    """Display first n elements of RDD in a nice format"""
    sample = rdd.take(n)
    for i, item in enumerate(sample, 1):
        print(f"{i:2d}: {item}")

# Save RDD to file (useful in Colab)
def save_rdd_to_colab(rdd, filename):
    """Save RDD results to a file in Colab"""
    with open(filename, 'w') as f:
        for item in rdd.collect():
            f.write(str(item) + '\n')
    print(f"Saved to {filename}")

# Load data from Colab files
def load_text_file_to_rdd(filename):
    """Load a text file from Colab into RDD"""
    with open(filename, 'r') as f:
        lines = f.readlines()
    return sc.parallelize([line.strip() for line in lines])

Memory usage: 14.5%
