In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

#**Creating RDD from LIST**

In [None]:
# Importing necessary libraries
from pyspark import SparkContext

# Creating a SparkContext
sc = SparkContext("local", "RDD Basic Example")

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Check if SparkContext exists
try:
    sc = SparkContext()
    spark = SparkSession(sc)
except ValueError:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .master("local") \
        .appName("RDD Basic Example") \
        .getOrCreate()
    sc = spark.sparkContext

# Creating an RDD from a list of numbers
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

In [None]:
# Creating an RDD from a list
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# ** RDD Transformations**

In [None]:
# Transformation: Squaring each element in the RDD
rdd_squared = rdd.map(lambda x: x*x)


# Transformation: Filtering even numbers
rdd_even = rdd.filter(lambda x: x % 2 == 0)


In [None]:
# Action: Collecting squared values
squared_values = rdd_squared.collect()
print("Squared Values:", squared_values)

Squared Values: [1, 4, 9, 16, 25]


In [None]:
# Action: Collecting filtering even values
even_values = rdd_even.collect()
print("Even Values:", even_values)

Even Values: [2, 4]


In [None]:
# Action: Reducing to find the sum
sum_of_values = rdd.reduce(lambda x, y: x + y)
print("Sum of Values:", sum_of_values)

# Action: Collecting even numbers
even_numbers = rdd_even.collect()
print("Even Numbers:", even_numbers)

# Stopping the SparkContext
sc.stop()

Sum of Values: 15
Even Numbers: [2, 4]


#**RDD Narrow Transformations**
In Apache Spark, narrow transformations are operations that are performed on individual partitions of an RDD without shuffling or redistributing the data across the cluster. These transformations are executed within the partitions, making them more efficient than wide transformations, which may require data shuffling across the cluster.

Here are some common narrow transformations in Spark RDDs along with examples:

**1. map(func):**

Applies a function to each element of the RDD and returns a new RDD.
The function can be any Python function or lambda expression.

In [None]:
# Transformation: Squaring each element in the RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd_squared = rdd.map(lambda x: x*x)
# Action: Collecting squared values
squared_values = rdd_squared.collect()
print("Squared Values:", squared_values)

Squared Values: [1, 4, 9, 16, 25]


**2.filter(func):**

Returns a new RDD containing only the elements that satisfy the given predicate (function).
The predicate function should return True or False.

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd_even = rdd.filter(lambda x: x % 2 == 0)
# Action: filtering the even numbers
even_values = rdd_even.collect()
print("Squared Values:", even_values)

Squared Values: [2, 4]


**3.flatMap(func):**

Similar to map, but each input item can be mapped to 0 or more output items.
The output is flattened into a single RDD.

In [None]:
rdd = sc.parallelize(["Hello World", "Spark is awesome"])
rdd_flat = rdd.flatMap(lambda line: line.split())
#Action:
rdd_flat_values=rdd_flat.collect()
print(rdd_flat_values)

['Hello', 'World', 'Spark', 'is', 'awesome']


In [None]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "Narrow Transformations Example")

# Create an RDD from a list of sentences
sentences = ["Hello world", "Spark is awesome"]
rdd = sc.parallelize(sentences)

# Apply flatMap transformation to split words
words_rdd = rdd.flatMap(lambda sentence: sentence.split())

# Action: Collect and print results
print("Original RDD:", rdd.collect())
print("Words RDD:", words_rdd.collect())

# Stop SparkContext
sc.stop()

Original RDD: ['Hello world', 'Spark is awesome']
Words RDD: ['Hello', 'world', 'Spark', 'is', 'awesome']


**4.mapPartitions(func):**

Similar to map, but operates on each partition of the RDD independently.
The function is applied to an iterator of elements within each partition.

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5], 2)  # Creating RDD with 2 partitions
def map_partition_func(iterator):
    return [sum(iterator)]
rdd_mapped = rdd.mapPartitions(map_partition_func)
#Action:
rdd_mapped_values=rdd_mapped.collect()
print(rdd_mapped_values)

[3, 12]


**5.mapPartitionsWithIndex(func):**

Similar to mapPartitions, but also provides the index of the partition.
Useful for tasks that require knowing the partition index.

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5], 2)  # Creating RDD with 2 partitions
def map_partition_func_with_index(partition_index, iterator):
    return [(partition_index, sum(iterator))]
rdd_mapped_with_index = rdd.mapPartitionsWithIndex(map_partition_func_with_index)
#Action:
rdd_mapped_with_index=rdd_mapped_with_index.collect()
print(rdd_mapped_with_index)

[(0, 3), (1, 12)]


In [None]:
sc.stop()

In [None]:
# Example: mapPartitions
def map_partition_func(iterator):
    return [sum(iterator)]

rdd = sc.parallelize([1, 2, 3, 4, 5], 2)  # Creating RDD with 2 partitions
rdd_mapped = rdd.mapPartitions(map_partition_func)

# Action: Collecting values after mapPartitions transformation
mapped_values = rdd_mapped.collect()
print("Mapped Values (mapPartitions):", mapped_values)

# Example: mapPartitionsWithIndex
def map_partition_func_with_index(partition_index, iterator):
    return [(partition_index, sum(iterator))]

rdd = sc.parallelize([1, 2, 3, 4, 5], 2)  # Creating RDD with 2 partitions
rdd_mapped_with_index = rdd.mapPartitionsWithIndex(map_partition_func_with_index)

# Action: Collecting values after mapPartitionsWithIndex transformation
mapped_values_with_index = rdd_mapped_with_index.collect()
print("Mapped Values with Index (mapPartitionsWithIndex):", mapped_values_with_index)

Mapped Values (mapPartitions): [3, 12]
Mapped Values with Index (mapPartitionsWithIndex): [(0, 3), (1, 12)]


#**Wider transformations**
**Wider transformations, also known as shuffle-based transformations, involve data shuffling across partitions and typically result in the creation of new partitions. Unlike narrow transformations, which operate on individual partitions independently, wider transformations require data exchange and coordination across the cluster. This can incur higher computational and network overhead. Here are some common wider transformations in Apache Spark:**

**1.groupByKey:**

Groups the values of an RDD with the same key into a single list.
This transformation can lead to a significant amount of data shuffling, especially if the data associated with a single key is spread across multiple partitions.
**2.reduceByKey:**

Similar to groupByKey, but performs reduction on the values for each key.
It combines values for each key using an associative and commutative function (like sum, max, min, etc.).
This transformation also involves data shuffling as it needs to bring together all values for each key across partitions.
**3.sortByKey:**

Sorts the elements of an RDD by their keys.
This transformation involves data shuffling to bring together all elements with the same key into the same partition for sorting.

**4.join:**

Performs an inner join between two RDDs based on their keys.
Data from both RDDs with the same key is brought together for processing, which requires data shuffling.
Depending on the data distribution and partitioning strategy, joining large RDDs can lead to significant shuffle overhead.

**5.cogroup:**

Groups the values of two RDDs with the same key into an iterator of tuples.
This transformation requires data shuffling as it brings together values from both RDDs with the same key.

**6.distinct:**

Returns a new RDD containing distinct elements from the original RDD.
Achieving distinct elements across partitions involves data shuffling to identify and eliminate duplicates.

**7.union:**

Concatenates two RDDs into a single RDD.
While the transformation itself doesn't always involve shuffling, the resulting RDD may need to be re-partitioned to maintain data balance across partitions.

**8.cartesian:**

Computes the Cartesian product of two RDDs, generating all possible pairs of elements (one from each RDD).
This transformation involves shuffling to combine elements from different partitions of the input RDDs.

These wider transformations are powerful but should be used judiciously as they can incur significant overhead, especially when dealing with large datasets. Efficient use of partitioning and caching can help mitigate the performance impact of wider transformations in Spark applications.

In [None]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "Wider Transformations Example")

# Example RDDs
data1 = [("apple", 1), ("banana", 2), ("apple", 3), ("banana", 4), ("orange", 5)]
data2 = [("apple", "red"), ("banana", "yellow"), ("orange", "orange")]

# Create RDDs
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)

# Wider transformations

# groupByKey: Group values by key
grouped_rdd = rdd1.groupByKey().mapValues(list)

# reduceByKey: Reduce values by key
sum_by_key_rdd = rdd1.reduceByKey(lambda x, y: x + y)

# sortByKey: Sort RDD elements by key
sorted_rdd = rdd2.sortByKey()

# join: Inner join two RDDs based on keys
joined_rdd = rdd1.join(rdd2)

# Printing the results
print("groupByKey Result:", grouped_rdd.collect())
print("reduceByKey Result:", sum_by_key_rdd.collect())
print("sortByKey Result:", sorted_rdd.collect())
print("join Result:", joined_rdd.collect())

# Stop SparkContext
sc.stop()

groupByKey Result: [('apple', [1, 3]), ('banana', [2, 4]), ('orange', [5])]
reduceByKey Result: [('apple', 4), ('banana', 6), ('orange', 5)]
sortByKey Result: [('apple', 'red'), ('banana', 'yellow'), ('orange', 'orange')]
join Result: [('banana', (2, 'yellow')), ('banana', (4, 'yellow')), ('orange', (5, 'orange')), ('apple', (1, 'red')), ('apple', (3, 'red'))]


#Example 1: groupByKey

In [None]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "Wider Transformations Example")

# Create an RDD from a list of key-value pairs
data = [("cat", 1), ("dog", 2), ("cat", 3), ("dog", 4), ("cat", 5)]
rdd = sc.parallelize(data)

# Apply groupByKey transformation to group values by key
grouped_rdd = rdd.groupByKey()

# Action: Collect and print results
print("Original RDD:", rdd.collect())
print("Grouped RDD:", [(key, list(values)) for key, values in grouped_rdd.collect()])
sc.stop()

Original RDD: [('cat', 1), ('dog', 2), ('cat', 3), ('dog', 4), ('cat', 5)]
Grouped RDD: [('cat', [1, 3, 5]), ('dog', [2, 4])]


In [None]:
sc.stop()

#Example 2: reduceByKey

In [None]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "Wider Transformations Example")

# Create an RDD from a list of key-value pairs
data = [("cat", 1), ("dog", 2), ("cat", 3), ("dog", 4), ("cat", 5)]
rdd = sc.parallelize(data)

# Apply reduceByKey transformation to sum values by key
sum_by_key_rdd = rdd.reduceByKey(lambda x, y: x + y)

# Action: Collect and print results
print("Original RDD:", rdd.collect())
print("Reduced by Key RDD:", sum_by_key_rdd.collect())

# Stop SparkContext
sc.stop()

Original RDD: [('cat', 1), ('dog', 2), ('cat', 3), ('dog', 4), ('cat', 5)]
Reduced by Key RDD: [('cat', 9), ('dog', 6)]


# Example 3: join

In [None]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "Wider Transformations Example")

# Create RDDs
rdd1 = sc.parallelize([("cat", 1), ("dog", 2), ("cat", 3)])
rdd2 = sc.parallelize([("cat", "white"), ("dog", "black"), ("cat", "gray")])

# Apply join transformation to perform inner join by key
joined_rdd = rdd1.join(rdd2)

# Action: Collect and print results
print("RDD1:", rdd1.collect())
print("RDD2:", rdd2.collect())
print("Joined RDD:", joined_rdd.collect())

# Stop SparkContext
sc.stop()

RDD1: [('cat', 1), ('dog', 2), ('cat', 3)]
RDD2: [('cat', 'white'), ('dog', 'black'), ('cat', 'gray')]
Joined RDD: [('cat', (1, 'white')), ('cat', (1, 'gray')), ('cat', (3, 'white')), ('cat', (3, 'gray')), ('dog', (2, 'black'))]


#Example 1: Creating an RDD from a List and Performing Basic Transformations

In [None]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "RDD Examples")

# Create an RDD from a list
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Perform transformations
squared_rdd = rdd.map(lambda x: x*x)
even_rdd = rdd.filter(lambda x: x % 2 == 0)

# Action: Collect and print results
print("Original RDD:", rdd.collect())
print("Squared RDD:", squared_rdd.collect())
print("Even Numbers RDD:", even_rdd.collect())

# Stop SparkContext
sc.stop()

In [None]:
sc.stop()

# Example 2: Cartesian Product of Two RDDs

In [None]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "Cartesian Product Example")

# Create RDDs
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize(['a', 'b', 'c'])

# Compute Cartesian product
cartesian_rdd = rdd1.cartesian(rdd2)

# Action: Collect and print results
print("Cartesian Product:")
for pair in cartesian_rdd.collect():
    print(pair)

# Stop SparkContext
sc.stop()

Cartesian Product:
(1, 'a')
(1, 'b')
(1, 'c')
(2, 'a')
(2, 'b')
(2, 'c')
(3, 'a')
(3, 'b')
(3, 'c')


# Example 3: Word Count using flatMap and reduceByKey

In [None]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "WordCount Example")

# Create an RDD from text file
lines_rdd = sc.textFile("sample_text.txt")

# Split lines into words and flatten
words_rdd = lines_rdd.flatMap(lambda line: line.split())

# Map each word to (word, 1) tuple and reduce by key to get word count
word_count_rdd = words_rdd.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)

# Action: Collect and print results
print("Word Count:")
for word, count in word_count_rdd.collect():
    print(word, ": ", count)

# Stop SparkContext
sc.stop()