# RDD

An RDD (Resilient Distributed Dataset) is a fundamental data structure in Apache Spark, representing an immutable, distributed collection of objects that can be processed in parallel across a cluster.

**Key Features of RDD**:

 - Immutability: Once created, an RDD cannot be modified. Operations on an RDD (like transformations) result in the creation     of a new RDD.
   
 - Distributed: RDDs are distributed across multiple nodes in a cluster, allowing for parallel processing.
 - Fault Tolerant: RDDs are resilient to node failures. Spark automatically recomputes lost data by keeping track of the  -    - transformations used to create the RDD (using lineage information).
 - Lazy Evaluation: Transformations on RDDs are not immediately executed. Instead, they are recorded in a DAG (Directed Acyclic Graph) of transformations, and only when an action is called (like collect, count, or save), the computation is triggered.
 - In-memory Processing: RDDs provide in-memory storage, which makes data processing faster compared to traditional disk-based storage.
   
**Types of Operations on RDD:**

 - Transformations: These are operations that create a new RDD from an existing one, such as map, filter, flatMap,     reduceByKey, etc. They are lazy, meaning they do not compute results immediately.
 - Actions: These are operations that trigger the actual computation and return a result to the driver program or write data to an external storage system. Examples include collect, count, reduce, saveAsTextFile, etc.

In [1]:
# Starting Spark Context
from pyspark import SparkConf, SparkContext
conf = SparkConf() \
    .setAppName("Khan") \
    .setMaster('local[*]') \
    .set("spark.executor.instances", "3") \
    .set("spark.driver.memory", "1g") \
    .set("spark.executor.memory", "1g") \
    .set("spark.executor.cores", "1")

spark = SparkContext(conf=conf)

24/09/13 20:58:47 WARN Utils: Your hostname, nooman resolves to a loopback address: 127.0.1.1; using 192.168.0.104 instead (on interface enp4s0)
24/09/13 20:58:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/13 20:58:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
### configration file
print(spark.getConf().toDebugString())

spark.app.id=local-1726241328748
spark.app.name=Khan
spark.app.startTime=1726241327956
spark.app.submitTime=1726241327834
spark.driver.extraJavaOptions=-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandl

In [3]:
collect_rdd = spark.parallelize([1,2,3,4,5])
collect_rdd.collect()

                                                                                

[1, 2, 3, 4, 5]

In [4]:
collect_rdd.count()

                                                                                

5

In [5]:
rdd = spark.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd.first()

1

In [6]:
rdd.max()

10

In [7]:
rdd.min()

1

In [8]:
rdd.map(lambda x:x**2).collect()

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

In [9]:
from operator import add
rdd.reduce(add)

55

In [10]:
rdd.reduce(lambda x, y:x + y)

55

In [11]:
rdd.reduce(lambda x, y :x - y)

-23

In [12]:
import numpy as np
x = rdd.map(lambda x: np.sqrt(x))
x.collect()

[np.float64(1.0),
 np.float64(1.4142135623730951),
 np.float64(1.7320508075688772),
 np.float64(2.0),
 np.float64(2.23606797749979),
 np.float64(2.449489742783178),
 np.float64(2.6457513110645907),
 np.float64(2.8284271247461903),
 np.float64(3.0),
 np.float64(3.1622776601683795)]

In [13]:
marks = {
'maths' : np.random.randint(1, 10, 10),
'physics' : np.random.randint(1, 10, 10),
'python' : np.random.randint(1, 10, 10),
'java' : np.random.randint(1, 10, 10)
}

print(marks)

# find the maximum marks from each subject
marks_rdd = spark.parallelize(marks.items())
marks_rdd.map(lambda x:(x[0], np.mean(x[1]))).collect()

{'maths': array([7, 3, 7, 9, 2, 1, 7, 1, 3, 9]), 'physics': array([5, 1, 7, 8, 3, 4, 6, 8, 5, 4]), 'python': array([6, 5, 9, 7, 4, 4, 1, 5, 1, 6]), 'java': array([4, 5, 5, 8, 7, 2, 3, 5, 8, 6])}


[('maths', np.float64(4.9)),
 ('physics', np.float64(5.1)),
 ('python', np.float64(4.8)),
 ('java', np.float64(5.3))]

#### Higher Order Function in PySpark

### map

Applies a given function to each element of the RDD and returns a new RDD with the results.

In [14]:
rdd = spark.parallelize([1, 2, 3, 4])
rdd.map(lambda x: x**2).collect()

[1, 4, 9, 16]

#### flatMap

Similar to map, but allows the function to return multiple values (e.g., lists or iterables) that are flattened into a single RDD.

In [15]:
rdd = spark.parallelize(["Hello world", "spark is great"])
rdd.map(lambda x: x.split(" ")).collect()

[['Hello', 'world'], ['spark', 'is', 'great']]

In [16]:
rdd = spark.parallelize(["Hello world", "spark is great"])
rdd.flatMap(lambda x: x.split(" ")).collect()

['Hello', 'world', 'spark', 'is', 'great']

#### filter

Returns a new RDD containing only the elements that satisfy a given condition.

In [17]:
rdd = spark.parallelize([1, 2, 3, 4])
rdd.filter(lambda x:x %2 ==0).collect()

[2, 4]

#### reduce

return aggregated results

In [18]:
rdd = spark.parallelize([1, 2, 3, 4])
rdd.reduce(lambda x, y: x + y)

10

#### reduceByKey()

Aggregates values of the same key using a specified associative and commutative reduce function.

In [19]:
rdd = spark.parallelize([("a", 2), ("b", 5), ("c", 9), ("c", 10)])
rdd.reduceByKey(lambda x, y: x+y).collect()

[('c', 19), ('a', 2), ('b', 5)]

#### groupByKey()

##### used for aggregation purpose

In [20]:
rdd = spark.parallelize([("a", 2), ("b", 5), ("c", 9), ("c", 10)])
rdd.groupByKey().mapValues(set).collect()

[('c', {9, 10}), ('a', {2}), ('b', {5})]

In [21]:
rdd = spark.parallelize([("a", 2), ("b", 5), ("c", 9), ("c", 10)])
rdd.groupByKey().mapValues(len).collect()

[('c', 2), ('a', 1), ('b', 1)]

In [24]:
rdd = spark.parallelize([("a", 2), ("b", 5), ("c", 9), ("c", 10)])
rdd.groupByKey().mapValues(sum).collect()

[('c', 19), ('a', 2), ('b', 5)]

#### groupBy()

In [25]:
rdd = spark. parallelize([1,1,2,4,5,6,1,4,5,6,3,2,1,4,5])
rdd.groupBy(lambda x:x).mapValues(list).collect()

[(1, [1, 1, 1, 1]),
 (2, [2, 2]),
 (3, [3]),
 (4, [4, 4, 4]),
 (5, [5, 5, 5]),
 (6, [6, 6])]

In [26]:
rdd = spark. parallelize([1,1,2,4,5,6,1,4,5,6,3,2,1,4,5])
rdd.groupBy(lambda x:x).mapValues(sum).collect()

[(1, 4), (2, 4), (3, 3), (4, 12), (5, 15), (6, 12)]

In [27]:
rdd = spark. parallelize([1,1,2,4,5,6,1,4,5,6,3,2,1,4,5])
rdd.groupBy(lambda x:x%2 == 0).mapValues(list).collect()

[(False, [1, 1, 5, 1, 5, 3, 1, 5]), (True, [2, 4, 6, 4, 6, 2, 4])]

In [28]:
rdd = spark. parallelize([1,1,2,4,5,6,1,4,5,6,3,2,1,4,5])
rdd.groupBy(lambda x:x%2 == 0).mapValues(sum).collect()

[(False, 22), (True, 28)]

#### mapPartitions()

In [29]:
def process_partitions(iterator):
    return (x**2 for x in iterator)

rdd = spark.parallelize([1, 2, 3, 4, 5, 6], 5)
print(rdd.glom().collect())
rdd.mapPartitions(process_partitions).collect()

[[1], [2], [3], [4], [5, 6]]


[1, 4, 9, 16, 25, 36]

#### aggregateByKey()

In [30]:
rdd = spark.parallelize([("a", 1), ("b", 2), ("a", 3)])
result_rdd = rdd.aggregateByKey((0, 0), 
                                lambda acc, value: (acc[0] + value, acc[1] + 1),
                                lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
result_rdd.collect()

[('a', (4, 2)), ('b', (2, 1))]

#### combineByKey()

In [31]:
def create_combiner(value):
    return (value, 1)

def merge_value(acc, value):
    return (acc[0] + value, acc[1] + 1)

def merge_combiners(acc1, acc2):
    return (acc1[0] + acc2[0], acc1[1] + acc2[1])

rdd = spark.parallelize([("a", 1), ("b", 2), ("a", 3)])
combined_rdd = rdd.combineByKey(create_combiner, merge_value, merge_combiners)
combined_rdd.collect()

[('a', (4, 2)), ('b', (2, 1))]

#### flatMapValues

In [32]:
rdd = spark.parallelize([("a", [1, 2]), ("b", [3, 4])])
flat_mapped_values_rdd = rdd.flatMapValues(lambda x: x)
flat_mapped_values_rdd.collect()

[('a', 1), ('a', 2), ('b', 3), ('b', 4)]

#### sample()

In [33]:
rdd = spark.parallelize([1, 2, 3, 4, 5, 7, 8, 4, 2, 4, 5, 9])
rdd.sample(withReplacement=False, fraction=0.7).collect()

[2, 4, 5, 7, 8, 2, 4, 5, 9]

#### Word Count

In [35]:
with open('/home/nooman/Documents/WordCount.txt', "rb") as file:
    words = file.read().strip()

In [None]:
print(words)

In [36]:
word_count = spark.textFile('/Words/WordCount.txt')
word_count.flatMap(lambda x: x.strip().split(" ")).groupBy(lambda x: x).mapValues(len).collect()

[('Noah', 19), ('Will', 19), ('Alex', 19)]

In [37]:
word_count = spark.textFile('/Words/WordCount.txt')
word_count.flatMap(lambda x: x.strip().split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a+b).collect()

[('Noah', 19), ('Will', 19), ('Alex', 19)]

In [38]:
word_count.getNumPartitions()

2

In [39]:
word_count.glom().collect()

[['Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex '],
 ['Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ',
  'Noah Will Alex ']]

#### Data Analysis with RDD

In [40]:
### load customer data from hdfs
customers = spark.textFile("/Ecommerce/Customers/Customers.csv")
customers.samporder_columnsle(withReplacement=False, fraction=0.01).collect()

['57156,Jesse,Nguyen,1976-12-15 00:00:00,Bremen,Bremen,Germany,148076,7172974188,Jesse.Nguyen@gmail.com,2020-04-13 00:00:00',
 '57189,Sarah,Hunt,1978-05-24 00:00:00,Zurich,Zurich,Switzerland,598317,8169119239,Sarah.Hunt@gmail.com,2020-05-20 00:00:00',
 '57280,Kayla,Grey,1952-05-17 00:00:00,Lucknow,Uttar Pradesh,India,753596,6528224522,Kayla.Grey@gmail.com,2020-11-09 00:00:00',
 '57358,Theodore,Lee,1959-10-06 00:00:00,Vienna,Vienna,Austria,789084,9235126073,Theodore.Lee@gmail.com,2021-02-15 00:00:00',
 '57487,Findlay,Kade,1978-04-28 00:00:00,Brandenburg,Potsdam,Germany,714481,8899015011,Findlay.Kade@gmail.com,2021-06-21 00:00:00',
 '57596,Stanley,Ali,1978-10-10 00:00:00,Zurich,Zurich,Switzerland,655505,9080686603,Stanley.Ali@gmail.com,2021-12-02 00:00:00']

In [41]:
customers.getNumPartitions()

2

In [42]:
### split the csv file by rows
customers = customers.map(lambda x:x.split(","))

In [43]:
### sample data after splitting
customers.collect()[:4]

[['CustomerID',
  'FirstName',
  'LastName',
  'Date_of_Birth',
  'City',
  'State',
  'Country',
  'PostalCode',
  'Phone',
  'Email',
  'DateEntered'],
 ['57081',
  'James',
  'Smith',
  '1987-03-26 00:00:00',
  'New York',
  'New York',
  'United States',
  '280862',
  '9638483934',
  'James.Smith@gmail.com',
  '2020-01-02 00:00:00'],
 ['57082',
  'Robert',
  'Downey Jr',
  '1973-05-24 00:00:00',
  'New York',
  'New York',
  'United States',
  '376573',
  '6588282115',
  'Robert.Downey Jr@gmail.com',
  '2020-01-06 00:00:00'],
 ['57083',
  'John',
  'Williams',
  '1990-04-14 00:00:00',
  'Chicago',
  'Illinois',
  'United States',
  '485629',
  '7641021429',
  'John.Williams@gmail.com',
  '2020-01-11 00:00:00']]

In [44]:
# headers of the our data
header = customers.first()

In [45]:
# remove header from actual data
customers = customers.filter(lambda x: x != header)

In [46]:
customers.count()

525

In [47]:
list(enumerate(header))

[(0, 'CustomerID'),
 (1, 'FirstName'),
 (2, 'LastName'),
 (3, 'Date_of_Birth'),
 (4, 'City'),
 (5, 'State'),
 (6, 'Country'),
 (7, 'PostalCode'),
 (8, 'Phone'),
 (9, 'Email'),
 (10, 'DateEntered')]

In [48]:
### assign crossponding column name to each row
from collections import namedtuple
column = namedtuple('column', header)
from datetime import datetime

def parseRecs(fields):
    # fields = line.split(",")
    return column( fields[0], fields[1], fields[2], fields[3][:10],  fields[4],
                  fields[5], fields[6], fields[7], fields[8], fields[9], fields[10][:10]
                 )
customers = customers.map(lambda rec: parseRecs(rec))
customers.take(5)

[column(CustomerID='57081', FirstName='James', LastName='Smith', Date_of_Birth='1987-03-26', City='New York', State='New York', Country='United States', PostalCode='280862', Phone='9638483934', Email='James.Smith@gmail.com', DateEntered='2020-01-02'),
 column(CustomerID='57082', FirstName='Robert', LastName='Downey Jr', Date_of_Birth='1973-05-24', City='New York', State='New York', Country='United States', PostalCode='376573', Phone='6588282115', Email='Robert.Downey Jr@gmail.com', DateEntered='2020-01-06'),
 column(CustomerID='57083', FirstName='John', LastName='Williams', Date_of_Birth='1990-04-14', City='Chicago', State='Illinois', Country='United States', PostalCode='485629', Phone='7641021429', Email='John.Williams@gmail.com', DateEntered='2020-01-11'),
 column(CustomerID='57084', FirstName='Michael', LastName='Johnson', Date_of_Birth='1953-03-25', City='Brisbane', State='Queensland', Country='Australia', PostalCode='260866', Phone='7354232181', Email='Michael.Johnson@gmail.com', 

In [49]:
#### filter India data Only
customers.filter(lambda x:x.Country == 'India').take(4)

[column(CustomerID='57204', FirstName='Melissa', LastName='Clooney', Date_of_Birth='1991-12-31', City='Gurgaon', State='Haryana', Country='India', PostalCode='128305', Phone='8287186662', Email='Melissa.Clooney@gmail.com', DateEntered='2020-06-10'),
 column(CustomerID='57213', FirstName='Shirley', LastName='Taylor', Date_of_Birth='1993-05-23', City='Gurgaon', State='Haryana', Country='India', PostalCode='710807', Phone='8301878470', Email='Shirley.Taylor@gmail.com', DateEntered='2020-06-27'),
 column(CustomerID='57217', FirstName='Brenda', LastName='Williams', Date_of_Birth='1951-04-13', City='Varanasi', State='Uttar Pradesh', Country='India', PostalCode='564691', Phone='7440859268', Email='Brenda.Williams@gmail.com', DateEntered='2020-07-02'),
 column(CustomerID='57218', FirstName='Pamela', LastName='Johnson', Date_of_Birth='1969-04-18', City='Gurgaon', State='Haryana', Country='India', PostalCode='681558', Phone='9952427264', Email='Pamela.Johnson@gmail.com', DateEntered='2020-07-03'

In [50]:
### number of customer per country
customers.map(lambda x:(x.Country, 1)).reduceByKey(lambda a, b: a+b).sortBy(lambda x:x[1], ascending=False).collect()

[('United States', 66),
 ('India', 40),
 ('Portugal', 37),
 ('France', 35),
 ('Switzerland', 33),
 ('Austria', 32),
 ('Australia', 30),
 ('Northern Ireland', 30),
 ('Germany', 29),
 ('Greece', 28),
 ('Ireland', 27),
 ('Belgium', 27),
 ('Russia', 24),
 ('Netherlands', 24),
 ('Romania', 20),
 ('Poland', 19),
 ('Italy', 14),
 ('New Zealand', 5),
 ('South Africa', 5)]

In [74]:
# load orders data from hdfs
orders = spark.textFile("/Ecommerce/Orders/Orders.csv")
orders.take(5)

['OrderID,CustomerID,PaymentID,OrderDate,ShipperID,ShipDate,DeliveryDate,Total_order_amount',
 '7655500,57083,2,2020-01-12,7,2020-01-13,2020-01-19,25112.0',
 '7655501,57086,3,2020-01-20,2,2020-01-24,2020-01-27,22453.0',
 '7655502,57086,4,2020-02-06,7,2020-02-11,2020-02-21,13293.0',
 '7655503,57088,4,2020-02-09,1,2020-02-13,2020-02-26,16063.0']

In [75]:
orders = orders.map(lambda x: x.split(","))

In [76]:
header = orders.collect()[0]

In [77]:
header

['OrderID',
 'CustomerID',
 'PaymentID',
 'OrderDate',
 'ShipperID',
 'ShipDate',
 'DeliveryDate',
 'Total_order_amount']

In [78]:
orders = orders.filter(lambda x: x != header)

In [79]:
orders.collect()[: 4]

[['7655500',
  '57083',
  '2',
  '2020-01-12',
  '7',
  '2020-01-13',
  '2020-01-19',
  '25112.0'],
 ['7655501',
  '57086',
  '3',
  '2020-01-20',
  '2',
  '2020-01-24',
  '2020-01-27',
  '22453.0'],
 ['7655502',
  '57086',
  '4',
  '2020-02-06',
  '7',
  '2020-02-11',
  '2020-02-21',
  '13293.0'],
 ['7655503',
  '57088',
  '4',
  '2020-02-09',
  '1',
  '2020-02-13',
  '2020-02-26',
  '16063.0']]

In [80]:
len(header)

8

In [82]:
order_columns = namedtuple('column', header)

def parseRecsOrders(fields):
    return order_columns(
        fields[0], fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7]
    )

orders = orders.map(lambda x: parseRecsOrders(x))

In [83]:
orders.getNumPartitions()

2

In [85]:
orders.take(2)

[column(OrderID='7655500', CustomerID='57083', PaymentID='2', OrderDate='2020-01-12', ShipperID='7', ShipDate='2020-01-13', DeliveryDate='2020-01-19', Total_order_amount='25112.0'),
 column(OrderID='7655501', CustomerID='57086', PaymentID='3', OrderDate='2020-01-20', ShipperID='2', ShipDate='2020-01-24', DeliveryDate='2020-01-27', Total_order_amount='22453.0')]

In [101]:
datetime.strptime("2020-01-12", "%Y-%m-%d")

datetime.datetime(2020, 1, 12, 0, 0)

In [112]:
# find th monthly revenues
from datetime import datetime
monthly_revenue = orders \
.map(lambda x: (datetime.strptime(x.OrderDate, "%Y-%m-%d").month, float(x.Total_order_amount))) \
.reduceByKey(lambda a, b: round(a+b, 3)) \
.sortBy(lambda x:x).collect()

In [115]:
orders.collect()

[column(OrderID='7655500', CustomerID='57083', PaymentID='2', OrderDate='2020-01-12', ShipperID='7', ShipDate='2020-01-13', DeliveryDate='2020-01-19', Total_order_amount='25112.0'),
 column(OrderID='7655501', CustomerID='57086', PaymentID='3', OrderDate='2020-01-20', ShipperID='2', ShipDate='2020-01-24', DeliveryDate='2020-01-27', Total_order_amount='22453.0'),
 column(OrderID='7655502', CustomerID='57086', PaymentID='4', OrderDate='2020-02-06', ShipperID='7', ShipDate='2020-02-11', DeliveryDate='2020-02-21', Total_order_amount='13293.0'),
 column(OrderID='7655503', CustomerID='57088', PaymentID='4', OrderDate='2020-02-09', ShipperID='1', ShipDate='2020-02-13', DeliveryDate='2020-02-26', Total_order_amount='16063.0'),
 column(OrderID='7655504', CustomerID='57090', PaymentID='4', OrderDate='2020-02-11', ShipperID='3', ShipDate='2020-02-15', DeliveryDate='2020-02-20', Total_order_amount='15193.0'),
 column(OrderID='7655505', CustomerID='57094', PaymentID='4', OrderDate='2020-02-15', Ship

While Spark DataFrames and Spark SQL offer higher-level abstractions and optimizations for data processing, RDDs still have their place in certain situations. Here’s a breakdown of when to use RDDs versus DataFrames and Spark SQL:

When to Use RDDs:
Custom Transformations and Algorithms:

If you need to perform complex transformations or algorithms that are not easily expressed using DataFrame APIs or SQL. For example, if you need fine-grained control over the execution of custom data processing logic (e.g., complex data aggregations, graph algorithms, machine learning model training).
Low-Level Control:

When you require more control over the data flow, partitioning, and storage formats. RDDs provide more flexibility to optimize storage and data distribution, which can be useful in specific performance tuning scenarios.
Unstructured Data:

When working with unstructured or semi-structured data that doesn’t fit well with the relational model used by DataFrames. RDDs are more versatile and can handle any type of data, including complex objects, whereas DataFrames require a schema.
Functional Programming:

If you prefer functional programming paradigms or need to leverage functions like map(), flatMap(), reduce(), etc., for specific use cases where DataFrame and SQL abstractions do not provide the required flexibility.
Binary Data Processing:

For binary data processing, such as reading data from images, audio files, or any raw binary formats. RDDs allow direct access to bytes, which can be more suitable in such cases.
Backward Compatibility:

If you are working with legacy code or projects that are heavily built around RDDs. It might be easier to continue using RDDs rather than refactoring the entire codebase to use DataFrames or Datasets.
When to Use DataFrames and Spark SQL:
Performance Optimizations:

DataFrames and Spark SQL are optimized for performance due to the use of Catalyst optimizer and Tungsten execution engine. They provide automatic optimization, such as predicate pushdown, caching, and more efficient execution plans.
Structured and Semi-Structured Data:

For structured data (like CSV, Parquet, JSON, etc.) or semi-structured data, where you can define a schema. DataFrames and Spark SQL offer a more user-friendly API and SQL syntax for these types of data.
Ease of Use and Readability:

DataFrames and SQL APIs are easier to write and read, especially for data analysts and engineers familiar with SQL. They provide a more concise syntax, which reduces code complexity.
Interoperability with SQL-based Systems:

When you need to integrate with other SQL-based systems or run interactive queries. Spark SQL provides the capability to use SQL directly, making it easy to interact with existing data warehouses or databases.
Machine Learning Pipelines:

For building machine learning pipelines in Spark MLlib, DataFrames are the preferred API, as they are designed to work seamlessly with the DataFrame-based pipeline components.
Summary:
Use RDDs when you need more control over low-level operations, handle unstructured data, or perform custom processing that isn't easily expressed using DataFrames or SQL.
Use DataFrames and Spark SQL when you prioritize performance, ease of use, interoperability with SQL, and work with structured or semi-structured data.
Would you like more details on specific use cases, or how to transition between RDDs and DataFrames in your workflow?