<a href="https://colab.research.google.com/github/RajashekarAllala/Python_For_Data_Engineering/blob/main/PySpark/01_PySpark_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Install PySpark
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
from pyspark.sql import SparkSession, DataFrame
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()

In [7]:
# Create RDD
sc = spark.sparkContext
data = [1, 2, 3, 4, 5]
# Parallelize the data
rdd = sc.parallelize(data)
rdd.collect()

[1, 2, 3, 4, 5]

In [12]:
# Create a RDD from a textFile
rdd1=sc.textFile("/content/drive/MyDrive/Data_Sets/numbers.txt")
rdd1.take(10)

['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']

In [14]:
# Create a RDD from Range

rdd2 = sc.range(start=0, end=50, step=5)
rdd3 = rdd2.collect()
rdd3 = sc.parallelize(rdd3)
rdd3.collect()

[0, 5, 10, 15, 20, 25, 30, 35, 40, 45]

**Lambda Function**

* It's a small anonymous function.
* It can take any number of arguments.
* It must have only one expression (no statements or multi-line logic).
* Returns the result of the expression automatically.

In [31]:
# Square a number

square = lambda x: x ** 2
result = square(5)
print("Square of 5 is:", result)

# Add two numbers

add = lambda a, b: a + b
result = add(3, 7)
print("Addition of a and b:", result)

# Use inside map()
numbers = [1, 2, 3, 4, 5]
squares = list(map(lambda x: x ** 2, numbers))
print("Squares of numbers:", squares)

Square of 5 is: 25
Addition of a and b: 10
Squares of numbers: [1, 4, 9, 16, 25]


In [21]:

# Using Map, create a new list with square of numbers
sqrs = rdd3.map(lambda x: x * x)
print("Squares of 5 numbers from the RDD3:", sqrs.take(5))

Squares of 5 numbers from the RDD3: [0, 25, 100, 225, 400]


In [22]:
# Use map to multiply numbers with 5
numbers = [1, 2, 3, 4, 5]
num_rdd = sc.parallelize(numbers)
num_rdd = num_rdd.map(lambda x: x * 5)
num_rdd.collect()

[5, 10, 15, 20, 25]

In [23]:
# Original List: ["quick", "bright", "silent"]
# Use map to append the suffix “ly” to each word.
words = ["quick", "bright", "silent"]
words_rdd = sc.parallelize(words)
words_rdd = words_rdd.map(lambda x: x + "ly")
words_rdd.collect()

['quickly', 'brightly', 'silently']

In [40]:
import random

data = [random.randint(1, 100) for i in range(5)]
rdd = sc.parallelize(data)
result = rdd.map(lambda x: str(x) + " - Even" if x % 2 == 0 else str(x) + " - Odd")
result.collect()

['32 - Even', '47 - Odd', '24 - Even', '53 - Odd', '61 - Odd']

In [41]:
# FlatMap - Break each sentence into words and make a single list of all the words.
sentences = ["Hello world", "Spark is great", "Map and flatMap"]
sentences_rdd = sc.parallelize(sentences)
words_rdd = sentences_rdd.flatMap(lambda x: x.split(" "))
words_rdd.collect()

['Hello', 'world', 'Spark', 'is', 'great', 'Map', 'and', 'flatMap']

In [42]:
# Example 2: Extracting Individual Characters from Words
words = ["apple", "banana", "grape"]
words_rdd = sc.parallelize(words)
chars_rdd = words_rdd.flatMap(lambda x: list(x))
chars_rdd.collect()

['a',
 'p',
 'p',
 'l',
 'e',
 'b',
 'a',
 'n',
 'a',
 'n',
 'a',
 'g',
 'r',
 'a',
 'p',
 'e']

In [43]:
# ReduceByKey
# List of sales records (product, amount)
sales = [("apple", 100), ("banana", 200), ("apple", 150), ("orange", 300), ("banana", 50)]
sales_rdd = sc.parallelize(sales)
# Calculate total sales per product
total_sales = sales_rdd.reduceByKey(lambda x, y: x + y)
total_sales.collect()

[('apple', 250), ('banana', 250), ('orange', 300)]

In [52]:
# ReduceByKey - List of words
words = ["apple", "banana", "apple", "orange", "banana", "apple"]
words_rdd = sc.parallelize(words)
word_map = words_rdd.map(lambda x: (x, 1))
print("Converting each word into a pair - (word, 1):", word_map.collect())
word_counts =  word_map.reduceByKey(lambda x, y: x + y)
print("After Adding the values for each unique word key:", word_counts.collect())

Converting each word into a pair - (word, 1): [('apple', 1), ('banana', 1), ('apple', 1), ('orange', 1), ('banana', 1), ('apple', 1)]
After Adding the values for each unique word key: [('apple', 3), ('banana', 2), ('orange', 1)]


In [53]:
# SortByKey
# List of student names and scores
student_scores = [("John", 88), ("Alice", 95), ("Bob", 78), ("Diana", 85)]
student_scores_rdd = sc.parallelize(student_scores)
# Sort by score in descending order
sorted_scores = student_scores_rdd.sortByKey(ascending=False)
sorted_scores.collect()

[('John', 88), ('Diana', 85), ('Bob', 78), ('Alice', 95)]

In [56]:
# List of product names and sales
sales_data = [("apple", 150), ("banana", 200), ("orange", 300), ("grape", 100)]
sales_rdd = sc.parallelize(sales_data)
# Sort by sales in descending order
sorted_sales = sales_rdd.sortByKey(ascending=True)
sorted_sales.collect()

[('apple', 150), ('banana', 200), ('grape', 100), ('orange', 300)]

In [58]:
# SortBy
# List of student names and scores
student_scores = [("John", 88), ("Alice", 95), ("Bob", 78), ("Diana", 85)]
student_scores_rdd = sc.parallelize(student_scores)
# Sort by score in descending order
sorted_scores = student_scores_rdd.sortBy(lambda x: x[1], ascending=False)
sorted_scores.collect()

[('Alice', 95), ('John', 88), ('Diana', 85), ('Bob', 78)]

In [61]:
# List of words
words = ["banana", "apple", "grape", "pineapple", "orange"]
words_rdd = sc.parallelize(words)
# Sort by length in ascending order
sorted_words = words_rdd.sortBy(lambda x: len(x), ascending=True)
sorted_words.collect()

['apple', 'grape', 'banana', 'orange', 'pineapple']