In [0]:
from pyspark.sql import SparkSession

In [0]:
spark=SparkSession.builder.appName("RDD_Operations").master("yarn").getOrCreate()

In [0]:
customers_data = [
"customer_id,name,city,state,country,registration_date,is_active",
"0,Customer_0,Bangalore,Karnataka,India,2023-11-11,True",
"1,Customer_1,Hyderabad,Delhi,India,2023-08-26,True",
"2,Customer_2,Ahmedabad,West Bengal,India,2023-06-23,True",
"3,Customer_3,Bangalore,Tamil Nadu,India,2023-03-24,False",
"4,Customer_4,Bangalore,Gujarat,India,2023-06-06,False",
"5,Customer_5,Delhi,Maharashtra,India,2023-04-19,False"]

In [0]:
data_rdd = spark.sparkContext.parallelize(customers_data)

In [0]:

data_rdd.collect()

Out[5]: ['customer_id,name,city,state,country,registration_date,is_active',
 '0,Customer_0,Bangalore,Karnataka,India,2023-11-11,True',
 '1,Customer_1,Hyderabad,Delhi,India,2023-08-26,True',
 '2,Customer_2,Ahmedabad,West Bengal,India,2023-06-23,True',
 '3,Customer_3,Bangalore,Tamil Nadu,India,2023-03-24,False',
 '4,Customer_4,Bangalore,Gujarat,India,2023-06-06,False',
 '5,Customer_5,Delhi,Maharashtra,India,2023-04-19,False']

In [0]:
spark

In [0]:
data_rdd.getNumPartitions()

Out[7]: 8

In [0]:
# first
header=data_rdd.first()

In [0]:
print(header)

customer_id,name,city,state,country,registration_date,is_active


In [0]:
# filter

data_rdd=data_rdd.filter(lambda row:row!=header)

In [0]:
print(data_rdd.collect())

['0,Customer_0,Bangalore,Karnataka,India,2023-11-11,True', '1,Customer_1,Hyderabad,Delhi,India,2023-08-26,True', '2,Customer_2,Ahmedabad,West Bengal,India,2023-06-23,True', '3,Customer_3,Bangalore,Tamil Nadu,India,2023-03-24,False', '4,Customer_4,Bangalore,Gujarat,India,2023-06-06,False', '5,Customer_5,Delhi,Maharashtra,India,2023-04-19,False']


In [0]:
# Map

test_data='0,Customer_0,Bangalore,Karnataka,India,2023-11-11,True'
print(test_data.split(','))

['0', 'Customer_0', 'Bangalore', 'Karnataka', 'India', '2023-11-11', 'True']


In [0]:
def parse_data(row):
    fields=row.split(',')
    return (
        int(fields[0]),
        fields[1],
        fields[2],
        fields[3],
        fields[4],
        fields[5],
        fields[6] == 'True'

    )

In [0]:
parsed_rdd=data_rdd.map(parse_data)

In [0]:
parsed_rdd.collect()

Out[15]: [(0, 'Customer_0', 'Bangalore', 'Karnataka', 'India', '2023-11-11', True),
 (1, 'Customer_1', 'Hyderabad', 'Delhi', 'India', '2023-08-26', True),
 (2, 'Customer_2', 'Ahmedabad', 'West Bengal', 'India', '2023-06-23', True),
 (3, 'Customer_3', 'Bangalore', 'Tamil Nadu', 'India', '2023-03-24', False),
 (4, 'Customer_4', 'Bangalore', 'Gujarat', 'India', '2023-06-06', False),
 (5, 'Customer_5', 'Delhi', 'Maharashtra', 'India', '2023-04-19', False)]

In [0]:
name_city_rdd=parsed_rdd.map(lambda row:(row[1],row[2]))
name_city_rdd.collect()

Out[16]: [('Customer_0', 'Bangalore'),
 ('Customer_1', 'Hyderabad'),
 ('Customer_2', 'Ahmedabad'),
 ('Customer_3', 'Bangalore'),
 ('Customer_4', 'Bangalore'),
 ('Customer_5', 'Delhi')]

In [0]:
cities_rdd=parsed_rdd.map(lambda row: row[2]).distinct()

In [0]:
cities_rdd.take(3)

Out[19]: ['Bangalore', 'Delhi', 'Hyderabad']

In [0]:
## ReduceByKey

customers_rdd=parsed_rdd.map(lambda row:(row[2],1)).reduceByKey(lambda x,y:x+y)

In [0]:
print(customers_rdd.collect())

[('Bangalore', 3), ('Delhi', 1), ('Hyderabad', 1), ('Ahmedabad', 1)]


In [0]:
parsed_rdd.map(lambda row:row[2]).countByValue()

Out[22]: defaultdict(int, {'Bangalore': 3, 'Hyderabad': 1, 'Ahmedabad': 1, 'Delhi': 1})

In [0]:
parsed_rdd.map(lambda row:(row[2],1)).reduceByKey(lambda x,y:x+y)

Out[23]: PythonRDD[49] at RDD at PythonRDD.scala:58

In [0]:
## Find cities with active customer

active_cities=parsed_rdd.filter(lambda row:row[6]== True).map(lambda row:row[2]).distinct()

In [0]:
active_cities.collect()

Out[25]: ['Bangalore', 'Hyderabad', 'Ahmedabad']

In [0]:
file_path = "/FileStore/tables/customers.csv"

# Read as RDD (line by line, just like HDFS textFile)
rdd = spark.sparkContext.textFile(file_path)

# Show a few lines
rdd.take(5)


Out[5]: ['customer_id,name,city,state,country,registration_date,is_active',
 '0,Customer_0,Pune,West Bengal,India,2023-10-10,True',
 '1,Customer_1,Bangalore,Gujarat,India,2023-10-19,False',
 '2,Customer_2,Bangalore,Karnataka,India,2023-02-10,True',
 '3,Customer_3,Bangalore,Telangana,India,2023-03-24,True']

In [0]:
header=rdd.first()


In [0]:
rdd_no_header=rdd.filter(lambda row:row!=header).map(lambda row:row.split(','))

In [0]:
rdd_no_header.first()

Out[8]: ['0', 'Customer_0', 'Pune', 'West Bengal', 'India', '2023-10-10', 'True']

In [0]:
reduced_rdd=rdd_no_header.map(lambda row:(row[2],1)).reduceByKey(lambda x,y:x+y)
reduced_rdd.collect()

Out[10]: [('Chennai', 21046),
 ('Mumbai', 21041),
 ('Pune', 21481),
 ('Bangalore', 21272),
 ('Hyderabad', 21174),
 ('Ahmedabad', 21272),
 ('Delhi', 21123),
 ('Kolkata', 21264)]

In [0]:
 grouped_by_rdd = rdd_no_header.map(lambda row:(row[2],1)).groupByKey()
 grouped_by_rdd.collect()

Out[11]: [('Chennai', <pyspark.resultiterable.ResultIterable at 0x731a898fae80>),
 ('Mumbai', <pyspark.resultiterable.ResultIterable at 0x731a898fa730>),
 ('Pune', <pyspark.resultiterable.ResultIterable at 0x731a898fa5b0>),
 ('Bangalore', <pyspark.resultiterable.ResultIterable at 0x731a898fa7f0>),
 ('Hyderabad', <pyspark.resultiterable.ResultIterable at 0x731a898fafa0>),
 ('Ahmedabad', <pyspark.resultiterable.ResultIterable at 0x731a898fa670>),
 ('Delhi', <pyspark.resultiterable.ResultIterable at 0x731a898fafd0>),
 ('Kolkata', <pyspark.resultiterable.ResultIterable at 0x731a8a0e8ee0>)]

In [0]:
grouped_by_result = grouped_by_rdd.map(lambda row:(row[0],len(row[1])))
grouped_by_result.collect()

Out[12]: [('Chennai', 21046),
 ('Mumbai', 21041),
 ('Pune', 21481),
 ('Bangalore', 21272),
 ('Hyderabad', 21174),
 ('Ahmedabad', 21272),
 ('Delhi', 21123),
 ('Kolkata', 21264)]