<a href="https://colab.research.google.com/github/Jaydip614/Calculator614/blob/main/Ex_Fundamentals_of_Spark_RDDs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Apache Spark Fundamentals: RDDs


In this notebook we will work with the RDDs that are part of the Spark Core. The implementation of Spark Core is a **RDD (Resilient Distributed Dataset)** which is a collection of data distributed in different nodes of the cluster that are processed in parallel.

We will use the PySpark API, but the concepts apply equally to all APIs (Scala, R, etc)

### Initializing Spark on Notebooks

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [None]:
#!conda install -c conda-forge findspark

In [None]:
import pandas as pd
import pyspark

### Create the SparkSession and SparkContext

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_training')\
        .getOrCreate()

In [None]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

### Create an RDD from a collection

In [None]:
num = [1,2,3,4,5]

num_rdd = sc.parallelize(num)
num_rdd.collect()

[1, 2, 3, 4, 5]

# Transformations
* Transformations are lazy in nature and will not be executed until an Action is executed on them.
* Let's try to understand the different transformations available.

### map
* This will map your input to some output based on the function specified in the function

In [None]:
double_rdd = num_rdd.map(lambda x : x * 2)
double_rdd.collect()

[2, 4, 6, 8, 10]

### filter
* To filter the data based on a certain condition. Let's try to find the even numbers of num_rdd.

In [None]:
even_rdd = num_rdd.filter(lambda x : x % 2 == 0)
even_rdd.collect()

[2, 4]

### flatMap
* This function is very similar to map, but it can return multiple elements for each entry in the given RDD.

In [None]:
flat_rdd = num_rdd.flatMap(lambda x : range(1,x))
flat_rdd.collect()

[1, 1, 2, 1, 2, 3, 1, 2, 3, 4]

### distinct
* This will return items other than an RDD.

In [None]:
rdd1 = sc.parallelize([10, 11, 10, 11, 12, 11])
dist_rdd = rdd1.distinct()
dist_rdd.collect()

[10, 12, 11]

### reduceByKey
* This function reduces key value pairs based on keys and a given function within reduceByKey

In [None]:
pairs = [ ("a", 5), ("b", 7), ("c", 2), ("a", 3), ("b", 1), ("c", 4)]
pair_rdd = sc.parallelize(pairs)

output = pair_rdd.reduceByKey(lambda x, y : x + y)

result = output.collect()
print(*result, sep='\n')

('b', 8)
('c', 6)
('a', 8)


### groupByKey
* This function is another ByKey function that can operate on an RDD (key, value pair)  but this will only group the values based on the keys. In other words, this will only perform the first step of reduceByKey.

In [None]:
grp_out = pair_rdd.groupByKey()
grp_out.collect()

[('b', <pyspark.resultiterable.ResultIterable at 0x7ff1fbfeb070>),
 ('c', <pyspark.resultiterable.ResultIterable at 0x7ff1fbe85de0>),
 ('a', <pyspark.resultiterable.ResultIterable at 0x7ff1fbe85c00>)]

### sortByKey
* This function will perform sorting on an RDD (key, value) pair based on the keys. By default, the sorting will be done in ascending order.

In [None]:
pairs = [ ("a", 5), ("d", 7), ("c", 2), ("b", 3)]
raw_rdd = sc.parallelize(pairs)

sortkey_rdd = raw_rdd.sortByKey()
result = sortkey_rdd.collect()
print(*result,sep='\n')

# Para clasificar en orden descendente, pase  “ascending=False”.

('a', 5)
('b', 3)
('c', 2)
('d', 7)


### Sort by
* sortBy is a more general function for sorting.

In [None]:
# Create RDD.
pairs = [ ("a", 5, 10), ("d", 7, 12), ("c", 2, 11), ("b", 3, 9)]
raw_rdd = sc.parallelize(pairs)

# Let’s try to do the sorting based on the 3rd element of the tuple.
sort_out = raw_rdd.sortBy(lambda x : x[2])
result = sort_out.collect()
print(*result, sep='\n')

('b', 3, 9)
('a', 5, 10)
('c', 2, 11)
('d', 7, 12)


# Actions

* Actions are operations on RDD that are executed immediately. While transformations return another RDD, actions return native data structures

### count
* This will count the number of items in the given RDD.

In [None]:
num = sc.parallelize([1,2,3,4,2])
num.count()

5

### first
* This will return the first element of the given RDD.

In [None]:
num.first()

1

### Collect
* This will return all elements for the given RDD.


In [None]:
num.collect()

[1, 2, 3, 4, 2]

**We should not use the collect operation while working with large data sets**. Because it will return all the data that is distributed between the different workers of the cluster to a controller. All the data will travel through the network from the worker to the driver and also the driver would need to store all the data. This will hamper the performance of your application.

### Take
* This will return the number of items specified.

In [None]:
num.take(3)

[1, 2, 3]

In [None]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
union_rdd = rdd1.union(rdd2)
print("Example for Union:", union_rdd.collect())



Example for Union: [1, 2, 3, 3, 4, 5]


In [None]:
intersection_rdd = rdd1.intersection(rdd2)
print("Example for intersection:", intersection_rdd.collect())


Example for intersection: [3]


In [None]:
distinct_rdd = union_rdd.distinct()
print("Example for disctinct:", distinct_rdd.collect())


Example for disctinct: [4, 1, 5, 2, 3]


In [None]:
cartesian_rdd = rdd1.cartesian(rdd2)
print("Example for cartesian:", cartesian_rdd.collect())


Example for cartesian: [(1, 3), (1, 4), (1, 5), (2, 3), (3, 3), (2, 4), (2, 5), (3, 4), (3, 5)]


In [None]:
zero_value = (0, 0)
seq_op = lambda acc, value: (acc[0] + value, acc[1] + 1)
comb_op = lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
average_result = num_rdd.aggregate(zero_value, seq_op, comb_op)
print("Example agg:", average_result[0] / average_result[1])

Example agg: 3.0


In [None]:
# Sample data: (date, temperature)
temperature_data = [
    ("2022-01-01", 25.0),
    ("2022-01-02", 28.5),
    ("2022-02-01", 22.0),
    ("2022-02-02", 24.5),
    ("2022-03-01", 18.0),
    ("2022-03-02", 20.5),
    # Add more data as needed
]

# Create an RDD from the sample data
temperature_rdd = sc.parallelize(temperature_data)

# Extract month and temperature as key-value pairs
month_temperature_rdd = temperature_rdd.map(lambda x: (x[0][:7], (x[1], 1)))

# Aggregate temperatures and counts by month
sum_counts_by_month = month_temperature_rdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

# Calculate average temperature for each month
average_temperature_by_month = sum_counts_by_month.mapValues(lambda x: x[0] / x[1])

# Collect and print the result
result = average_temperature_by_month.collect()
for month, average_temp in result:
    print(f"Month: {month}, Average Temperature: {average_temp:.2f}°C")

# Stop Spark
sc.stop()


Month: 2022-03, Average Temperature: 19.25°C
Month: 2022-01, Average Temperature: 26.75°C
Month: 2022-02, Average Temperature: 23.25°C


In [None]:
text_rdd = sc.textFile("Sample.txt")

# Perform transformations and actions
word_counts = (text_rdd.flatMap(lambda line: line.split())
                         .map(lambda word: (word, 1))
                         .reduceByKey(lambda a, b: a + b)
                         .sortByKey())

# Display result
print("Word Counts:")
for word, count in word_counts.collect():
    print(f"{word}: {count}")

sc.stop()


AttributeError: 'NoneType' object has no attribute 'sc'