In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("RDD_Exercises") \
    .getOrCreate()

In [None]:
data ='''call_id,caller,receiver,city,call_type,duration_seconds,cost
C001,Amit,Rahul,Hyderabad,Local,180,2.5
C002,Neha,Arjun,Bangalore,STD,320,6.0
C003,Rahul,Pooja,Delhi,Local,60,1.0
C004,Pooja,Neha,Mumbai,ISD,900,25.0
C005,Arjun,Amit,Chennai,STD,400,7.5
C006,Sneha,Karan,Hyderabad,Local,240,3.0
C007,Karan,Sneha,Delhi,Local,120,2.0
C008,Riya,Vikas,Bangalore,STD,360,6.5
C009,Vikas,Riya,Mumbai,ISD,1100,30.0
C010,Anjali,Sanjay,Chennai,Local,90,1.5
C011,Farhan,Ayesha,Delhi,STD,420,7.0
C012,Ayesha,Farhan,Hyderabad,ISD,950,28.0
C013,Suresh,Divya,Bangalore,Local,150,2.0
C014,Divya,Suresh,Mumbai,STD,380,6.8
C015,Nikhil,Priya,Delhi,Local,200,2.8
C016,Priya,Nikhil,Chennai,STD,410,7.2
C017,Rohit,Kavya,Hyderabad,Local,170,2.3
C018,Kavya,Rohit,Bangalore,Local,140,2.1
C019,Manish,Tina,Mumbai,ISD,1000,27.0
C020,Tina,Manish,Delhi,STD,350,6.2'''


with open("call_records.csv","w") as f:
  f.write(data)

Task 1

Read the CSV file using sparkContext.textFile and display the first 5 records.

In [None]:
rdd = spark.sparkContext.textFile("call_records.csv")
rdd.take(5)

['call_id,caller,receiver,city,call_type,duration_seconds,cost',
 'C001,Amit,Rahul,Hyderabad,Local,180,2.5',
 'C002,Neha,Arjun,Bangalore,STD,320,6.0',
 'C003,Rahul,Pooja,Delhi,Local,60,1.0',
 'C004,Pooja,Neha,Mumbai,ISD,900,25.0']

Task 2

Remove the header row and create a clean RDD containing only data rows.

In [None]:
header = rdd.first()
data_rdd = rdd.filter(lambda row: row!= header)
data_rdd.collect()

['C001,Amit,Rahul,Hyderabad,Local,180,2.5',
 'C002,Neha,Arjun,Bangalore,STD,320,6.0',
 'C003,Rahul,Pooja,Delhi,Local,60,1.0',
 'C004,Pooja,Neha,Mumbai,ISD,900,25.0',
 'C005,Arjun,Amit,Chennai,STD,400,7.5',
 'C006,Sneha,Karan,Hyderabad,Local,240,3.0',
 'C007,Karan,Sneha,Delhi,Local,120,2.0',
 'C008,Riya,Vikas,Bangalore,STD,360,6.5',
 'C009,Vikas,Riya,Mumbai,ISD,1100,30.0',
 'C010,Anjali,Sanjay,Chennai,Local,90,1.5',
 'C011,Farhan,Ayesha,Delhi,STD,420,7.0',
 'C012,Ayesha,Farhan,Hyderabad,ISD,950,28.0',
 'C013,Suresh,Divya,Bangalore,Local,150,2.0',
 'C014,Divya,Suresh,Mumbai,STD,380,6.8',
 'C015,Nikhil,Priya,Delhi,Local,200,2.8',
 'C016,Priya,Nikhil,Chennai,STD,410,7.2',
 'C017,Rohit,Kavya,Hyderabad,Local,170,2.3',
 'C018,Kavya,Rohit,Bangalore,Local,140,2.1',
 'C019,Manish,Tina,Mumbai,ISD,1000,27.0',
 'C020,Tina,Manish,Delhi,STD,350,6.2']

Task 3

Split each row into individual fields using a delimiter.

In [None]:
split_rdd = data_rdd.map(lambda row: row.split(","))
split_rdd.take(5)

[['C001', 'Amit', 'Rahul', 'Hyderabad', 'Local', '180', '2.5'],
 ['C002', 'Neha', 'Arjun', 'Bangalore', 'STD', '320', '6.0'],
 ['C003', 'Rahul', 'Pooja', 'Delhi', 'Local', '60', '1.0'],
 ['C004', 'Pooja', 'Neha', 'Mumbai', 'ISD', '900', '25.0'],
 ['C005', 'Arjun', 'Amit', 'Chennai', 'STD', '400', '7.5']]

Task 4

Calculate the total call cost per city.

In [None]:
city_cost_rdd=split_rdd.map(lambda x: (x[3], float(x[6])))
total_cost_per_city = city_cost_rdd.reduceByKey(lambda a,b:a+b)
total_cost_per_city.collect()

[('Hyderabad', 35.8),
 ('Delhi', 19.0),
 ('Mumbai', 88.8),
 ('Bangalore', 16.6),
 ('Chennai', 16.2)]

Task 5

Identify the city with the highest total call cost.

In [None]:
highest_city = total_cost_per_city.reduce(
    lambda a,b: a if a[1]> b[1] else b
)
highest_city

('Mumbai', 88.8)

Task 6

Calculate the total call duration per call type (Local, STD, ISD).

In [None]:
call_duration_per_type= split_rdd.map(lambda x:(x[4], int(x[5])))
total_call_duration_per_type = call_duration_per_type.reduceByKey(lambda a,b:a+b/3600)
total_call_duration_per_type.collect()

[('Local', 180.1833726851852),
 ('STD', 320.3278657407407),
 ('ISD', 900.5695216049382)]

Task 7

Count the number of calls per city.

In [None]:
city_calls_rdd=split_rdd.map(lambda x: (x[3], 1))
total_calls_per_city = city_calls_rdd.reduceByKey(lambda a,b: a+b)
total_calls_per_city.collect()

[('Hyderabad', 4),
 ('Delhi', 5),
 ('Mumbai', 4),
 ('Bangalore', 4),
 ('Chennai', 3)]

Task 8

Calculate the average call cost per city using RDD transformations.

In [None]:

# (city, (cost, 1)) for each row
avg_cost = split_rdd.map(lambda x: (x[3], (float(x[6]), 1)))

# Sum costs and counts per city
sum_and_count = avg_cost.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# Compute average = sum / count
average_cost_per_city =  sum_and_count.mapValues(lambda x: round(x[0] / x[1], 2))
average_cost_per_city.collect()



[('Hyderabad', 8.95),
 ('Delhi', 3.8),
 ('Mumbai', 22.2),
 ('Bangalore', 4.15),
 ('Chennai', 5.4)]

Task 9

Filter and list all high-value calls where call cost is greater than 20.

In [None]:
high_value_calls = split_rdd.map(lambda x: (x[0], x[3], float(x[6]))).filter(lambda t: t[2] > 20.0)
high_value_calls.collect()


[('C004', 'Mumbai', 25.0),
 ('C009', 'Mumbai', 30.0),
 ('C012', 'Hyderabad', 28.0),
 ('C019', 'Mumbai', 27.0)]

Task 10

Count the number of ISD calls per city.

In [None]:
isd_calls_per_city =split_rdd.filter(lambda t: t[4]=="ISD" ).map(lambda x:(x[3],1))
total_isd_calls_per_city = isd_calls_per_city.reduceByKey(lambda a,b :a+b)
total_isd_calls_per_city.collect()

[('Mumbai', 3), ('Hyderabad', 1)]

Task 11

Identify the longest call based on call duration.

In [None]:
call_duration= split_rdd.map(lambda x:(x[3],int(x[5])))
total_call_duration = call_duration.reduceByKey(lambda a,b:a+b/3600)
total_call_duration.collect()
longest_call_duration = total_call_duration.reduce(
    lambda a,b: a if a[1]> b[1] else b
)
longest_call_duration

('Mumbai', 900.4111882716049)

Task 12

Calculate the total revenue generated by each caller.

In [None]:
total_revenue=split_rdd.map(lambda x: (x[1], float(x[6])))
total_cost_per_person = total_revenue.reduceByKey(lambda a,b:a+b)
total_cost_per_person.collect()

[('Amit', 2.5),
 ('Pooja', 25.0),
 ('Karan', 2.0),
 ('Riya', 6.5),
 ('Vikas', 30.0),
 ('Suresh', 2.0),
 ('Divya', 6.8),
 ('Nikhil', 2.8),
 ('Rohit', 2.3),
 ('Manish', 27.0),
 ('Tina', 6.2),
 ('Neha', 6.0),
 ('Rahul', 1.0),
 ('Arjun', 7.5),
 ('Sneha', 3.0),
 ('Anjali', 1.5),
 ('Farhan', 7.0),
 ('Ayesha', 28.0),
 ('Priya', 7.2),
 ('Kavya', 2.1)]

Task 13

Detect suspicious calls based on the following rule:
duration greater than 900 seconds
cost greater than 25

In [None]:
suspicious_calls=split_rdd.filter(lambda x: float(x[6])>25.0 and float(x[5])>900)
suspicious_calls.collect()

[['C009', 'Vikas', 'Riya', 'Mumbai', 'ISD', '1100', '30.0'],
 ['C012', 'Ayesha', 'Farhan', 'Hyderabad', 'ISD', '950', '28.0'],
 ['C019', 'Manish', 'Tina', 'Mumbai', 'ISD', '1000', '27.0']]