In [3]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=3a4978986a3fa3caeffb0fb13054318be99ac611c0a68bfdea90c71a8e5df84c
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [5]:
from pyspark import SparkContext, SparkConf

# Initialize Spark
conf = SparkConf().setAppName("RDD_Example").setMaster("local[*]")
sc = SparkContext(conf=conf)

# Create a dummy dataset
books_data = [
    ("Python Basics", "John Doe", 2020, 25.99, ["Programming", "Beginner"]),
    ("Advanced Java", "Jane Smith", 2019, 34.99, ["Programming", "Advanced"]),
    ("Data Science Essentials", "Bob Johnson", 2021, 29.99, ["Data Science", "Intermediate"]),
    ("Machine Learning 101", "Alice Brown", 2020, 27.99, ["ML", "Beginner"]),
    ("Deep Learning Advanced", "Charlie Wilson", 2021, 39.99, ["ML", "Advanced"]),
    ("SQL Mastery", "Eve Davis", 2018, 22.99, ["Database", "Advanced"]),
    ("Web Development Fundamentals", "Frank Miller", 2019, 24.99, ["Web", "Beginner"]),
    ("Cloud Computing Basics", "Grace Lee", 2020, 26.99, ["Cloud", "Beginner"]),
    ("Artificial Intelligence", "Henry Taylor", 2021, 36.99, ["AI", "Intermediate"]),
    ("Cybersecurity Essentials", "Ivy Chen", 2020, 28.99, ["Security", "Intermediate"])
]


In [16]:
books_data

[('Python Basics', 'John Doe', 2020, 25.99, ['Programming', 'Beginner']),
 ('Advanced Java', 'Jane Smith', 2019, 34.99, ['Programming', 'Advanced']),
 ('Data Science Essentials',
  'Bob Johnson',
  2021,
  29.99,
  ['Data Science', 'Intermediate']),
 ('Machine Learning 101', 'Alice Brown', 2020, 27.99, ['ML', 'Beginner']),
 ('Deep Learning Advanced', 'Charlie Wilson', 2021, 39.99, ['ML', 'Advanced']),
 ('SQL Mastery', 'Eve Davis', 2018, 22.99, ['Database', 'Advanced']),
 ('Web Development Fundamentals',
  'Frank Miller',
  2019,
  24.99,
  ['Web', 'Beginner']),
 ('Cloud Computing Basics', 'Grace Lee', 2020, 26.99, ['Cloud', 'Beginner']),
 ('Artificial Intelligence',
  'Henry Taylor',
  2021,
  36.99,
  ['AI', 'Intermediate']),
 ('Cybersecurity Essentials',
  'Ivy Chen',
  2020,
  28.99,
  ['Security', 'Intermediate'])]

In [6]:
#Create an RDD
books_rdd = sc.parallelize(books_data)

# Map: Extract book titles
titles_rdd = books_rdd.map(lambda x: x[0])
print("Book Titles:", titles_rdd.collect())

Book Titles: ['Python Basics', 'Advanced Java', 'Data Science Essentials', 'Machine Learning 101', 'Deep Learning Advanced', 'SQL Mastery', 'Web Development Fundamentals', 'Cloud Computing Basics', 'Artificial Intelligence', 'Cybersecurity Essentials']


In [7]:
# Filter: Get books published in 2020
books_2020 = books_rdd.filter(lambda x: x[2] == 2020)
print("Books published in 2020:", books_2020.collect())

Books published in 2020: [('Python Basics', 'John Doe', 2020, 25.99, ['Programming', 'Beginner']), ('Machine Learning 101', 'Alice Brown', 2020, 27.99, ['ML', 'Beginner']), ('Cloud Computing Basics', 'Grace Lee', 2020, 26.99, ['Cloud', 'Beginner']), ('Cybersecurity Essentials', 'Ivy Chen', 2020, 28.99, ['Security', 'Intermediate'])]


In [8]:

# FlatMap: Get all tags
all_tags = books_rdd.flatMap(lambda x: x[4])
print("All tags:", all_tags.distinct().collect())
print("Total number of books:", books_rdd.count())
total_price = books_rdd.map(lambda x: x[3]).reduce(lambda x, y: x + y)
print("Total price of all books:", total_price)



All tags: ['Programming', 'Cloud', 'Security', 'Beginner', 'Advanced', 'Data Science', 'Intermediate', 'ML', 'Database', 'Web', 'AI']
Total number of books: 10
Total price of all books: 299.9


In [10]:
# Create a key-value RDD with author as key
author_books = books_rdd.map(lambda x: (x[1], x[0]))

# GroupByKey: Group books by author
books_by_author = author_books.groupByKey()
print("Books grouped by author:", books_by_author.mapValues(list).collect())

# ReduceByKey: Count books per author
books_count_by_author = author_books.mapValues(lambda x: 1).reduceByKey(lambda x, y: x + y)
print("Number of books per author:", books_count_by_author.collect())


Books grouped by author: [('Jane Smith', ['Advanced Java']), ('Bob Johnson', ['Data Science Essentials']), ('Charlie Wilson', ['Deep Learning Advanced']), ('Frank Miller', ['Web Development Fundamentals']), ('Henry Taylor', ['Artificial Intelligence']), ('John Doe', ['Python Basics']), ('Alice Brown', ['Machine Learning 101']), ('Eve Davis', ['SQL Mastery']), ('Grace Lee', ['Cloud Computing Basics']), ('Ivy Chen', ['Cybersecurity Essentials'])]
Number of books per author: [('Jane Smith', 1), ('Bob Johnson', 1), ('Charlie Wilson', 1), ('Frank Miller', 1), ('Henry Taylor', 1), ('John Doe', 1), ('Alice Brown', 1), ('Eve Davis', 1), ('Grace Lee', 1), ('Ivy Chen', 1)]


In [11]:
# Join: Join book titles with their prices
titles_prices = books_rdd.map(lambda x: (x[0], x[3]))
joined_data = titles_rdd.zip(titles_prices)
print("Joined data (title, (title, price)):", joined_data.collect())

# Aggregate: Find average price of books
initial_value = (0, 0)  # (sum, count)
seqOp = lambda acc, value: (acc[0] + value[3], acc[1] + 1)
combOp = lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
sum_count = books_rdd.aggregate(initial_value, seqOp, combOp)
avg_price = sum_count[0] / sum_count[1]
print("Average book price:", avg_price)

# Custom Partitioning
def custom_partitioner(key):
    if "Programming" in key:
        return 0
    elif "Data Science" in key or "ML" in key:
        return 1
    else:
        return 2

# Repartition the data based on categories
categorized_books = books_rdd.map(lambda x: (x[4][0], x))
partitioned_books = categorized_books.partitionBy(3, custom_partitioner)

# Show the partitions
for i in range(3):
    print(f"Partition {i}:", partitioned_books.glom().collect()[i])


Joined data (title, (title, price)): [('Python Basics', ('Python Basics', 25.99)), ('Advanced Java', ('Advanced Java', 34.99)), ('Data Science Essentials', ('Data Science Essentials', 29.99)), ('Machine Learning 101', ('Machine Learning 101', 27.99)), ('Deep Learning Advanced', ('Deep Learning Advanced', 39.99)), ('SQL Mastery', ('SQL Mastery', 22.99)), ('Web Development Fundamentals', ('Web Development Fundamentals', 24.99)), ('Cloud Computing Basics', ('Cloud Computing Basics', 26.99)), ('Artificial Intelligence', ('Artificial Intelligence', 36.99)), ('Cybersecurity Essentials', ('Cybersecurity Essentials', 28.99))]
Average book price: 29.99
Partition 0: [('Programming', ('Python Basics', 'John Doe', 2020, 25.99, ['Programming', 'Beginner'])), ('Programming', ('Advanced Java', 'Jane Smith', 2019, 34.99, ['Programming', 'Advanced']))]
Partition 1: [('Data Science', ('Data Science Essentials', 'Bob Johnson', 2021, 29.99, ['Data Science', 'Intermediate'])), ('ML', ('Machine Learning 101

In [14]:

books_rdd.cache()

author_nationality = sc.broadcast({"John Doe": "USA", "Jane Smith": "UK", "Bob Johnson": "Canada"})

def add_nationality(book):
    author = book[1]
    nationality = author_nationality.value.get(author, "Unknown")
    return book + (nationality,)

books_with_nationality = books_rdd.map(add_nationality)
print("Books with author nationality:", books_with_nationality.take(3))



Books with author nationality: [('Python Basics', 'John Doe', 2020, 25.99, ['Programming', 'Beginner'], 'USA'), ('Advanced Java', 'Jane Smith', 2019, 34.99, ['Programming', 'Advanced'], 'UK'), ('Data Science Essentials', 'Bob Johnson', 2021, 29.99, ['Data Science', 'Intermediate'], 'Canada')]


In [15]:
# Clean up
sc.stop()