In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('RDD Operation').getOrCreate()

In [36]:
cust_data = [
    "customer_id,name,city,state,country,registration_date,is_active",
    "0,Customer_0,Pune,Maharashtra,India,2023-06-29,False",
    "1,Customer_1,Bangalore,Tamil Nadu,India,2023-12-07,True",
    "2,Customer_2,Hyderabad,Gujarat,India,2023-10-27,True",
    "3,Customer_3,Bangalore,Karnataka,India,2023-10-17,False",
    "4,Customer_4,Ahmedabad,Karnataka,India,2023-03-14,False",
    "5,Customer_5,Hyderabad,Karnataka,India,2023-07-28,False",
]

In [6]:
sc = spark.sparkContext

In [7]:
rdd = sc.parallelize(cust_data)

In [8]:
rdd.getNumPartitions()

2

RDD - Resilient Distributed Dataset

In [16]:
#first() - return the first element of the RDD
header = rdd.first()
header

'customer_id,name,city,state,country,registration_date,is_active'

In [17]:
#filter() - based on condition it'll filter the data
data_rdd = rdd.filter(lambda row : row != header)
data_rdd.collect()

['0,Customer_0,Pune,Maharashtra,India,2023-06-29,False',
 '1,Customer_1,Bangalore,Tamil Nadu,India,2023-12-07,True',
 '2,Customer_2,Hyderabad,Gujarat,India,2023-10-27,True',
 '3,Customer_3,Bangalore,Karnataka,India,2023-10-17,False',
 '4,Customer_4,Ahmedabad,Karnataka,India,2023-03-14,False',
 '5,Customer_5,Hyderabad,Karnataka,India,2023-07-28,False']

In [21]:
#map() - It applies a function to each element of a RDD
def parse_row(row):
    fields = row.split(',')
    return (
        int(fields[0]),
        fields[1],
        fields[2],
        fields[3],
        fields[4],
        fields[5],
        fields[6] == 'True'
    )

In [22]:
parsed_rdd = data_rdd.map(parse_row)

In [35]:
first_row = data_rdd.first()
first_row.split(',')

['0', 'Customer_0', 'Pune', 'Maharashtra', 'India', '2023-06-29', 'False']

Advance RDD Functions

In [40]:
#Extract field with map() - Customer, City
name_city_rdd = parsed_rdd.map(lambda row : (row[1], row[2]))
name_city_rdd.first()

('Customer_0', 'Pune')

In [45]:
#filter() - filter out some data in RDD : filter out active customers
active_cust = parsed_rdd.filter(lambda row : row[6] == True)
active_cust.collect()

[(1, 'Customer_1', 'Bangalore', 'Tamil Nadu', 'India', '2023-12-07', True),
 (2, 'Customer_2', 'Hyderabad', 'Gujarat', 'India', '2023-10-27', True)]

In [56]:
#distinct() - get the distinct data from a RDD
cities_rdd = parsed_rdd.map(lambda row : row[2]).distinct()
cities_rdd.collect()

['Pune', 'Hyderabad', 'Bangalore', 'Ahmedabad']

In [50]:
cities_rdd.take(1)

['Pune']

In [61]:
#reduceByKey() - combines values for each key using an associative reduce function => transformation
cust_per_city1 = parsed_rdd.map(lambda row : (row[2],1)).reduceByKey(lambda x,y : x+y)
cust_per_city1.collect()

[('Pune', 1), ('Hyderabad', 2), ('Bangalore', 2), ('Ahmedabad', 1)]

In [62]:
cust_per_city1 #transformation - so returns a RDD

PythonRDD[74] at collect at /tmp/ipykernel_152595/2418480433.py:3

In [59]:
#countByValue() => action
cust_per_city2 = parsed_rdd.map(lambda row : row[2]).countByValue()
cust_per_city2 #action - so gives a value

defaultdict(int, {'Pune': 1, 'Bangalore': 2, 'Hyderabad': 2, 'Ahmedabad': 1})

In [63]:
parsed_rdd.collect()

[(0, 'Customer_0', 'Pune', 'Maharashtra', 'India', '2023-06-29', False),
 (1, 'Customer_1', 'Bangalore', 'Tamil Nadu', 'India', '2023-12-07', True),
 (2, 'Customer_2', 'Hyderabad', 'Gujarat', 'India', '2023-10-27', True),
 (3, 'Customer_3', 'Bangalore', 'Karnataka', 'India', '2023-10-17', False),
 (4, 'Customer_4', 'Ahmedabad', 'Karnataka', 'India', '2023-03-14', False),
 (5, 'Customer_5', 'Hyderabad', 'Karnataka', 'India', '2023-07-28', False)]

In [66]:
#cities with active customer
active_cities = parsed_rdd.filter(lambda row : row[6]).map(lambda row : row[2]).distinct()
active_cities.collect()

['Hyderabad', 'Bangalore']

In [74]:
#count active customer by state
active_cust_count = (
    parsed_rdd
    .filter(lambda row : row[6])
    .map(lambda row : (row[3], 1))
    .reduceByKey(lambda x, y : x+y)
)
active_cust_count.collect()

[('Tamil Nadu', 1), ('Gujarat', 1)]

In [None]:
#save the active cities in HDFS
active_cities.saveAsTextFile("/tmp/active_cities.csv")

In [77]:
!hadoop fs -ls /tmp

Found 4 items
drwxr-xr-x   - root hadoop          0 2025-05-28 03:37 /tmp/active_cities.csv
drwxrwxrwt   - hdfs hadoop          0 2025-04-18 06:05 /tmp/hadoop-yarn
drwx-wx-wx   - hive hadoop          0 2025-04-18 06:05 /tmp/hive
-rw-r--r--   2 root hadoop         42 2025-04-24 04:12 /tmp/inputdbztest.txt


In [81]:
spark.stop()