# Spark - Operations

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
.appName("RDD_Operations") \
.getOrCreate()

In [3]:
customer_data = [
"customer_id,name,city,state,country,registration_date,is_active",
"0,Customer_0,Bangalore,Karnataka,India,2023-11-11,True",
"1,Customer_1,Hyderabad,Delhi,India,2023-08-26,True",
"2,Customer_2,Ahmedabad,West Bengal,India,2023-06-23,True",
"3,Customer_3,Bangalore,Tamil Nadu,India,2023-03-24,False",
"4,Customer_4,Bangalore,Gujarat,India,2023-06-06,False",
"5,Customer_5,Delhi,Maharashtra,India,2023-04-19,False",
]


In [4]:
data_rdd = spark.sparkContext.parallelize(customer_data)

In [5]:
data_rdd.collect()

['customer_id,name,city,state,country,registration_date,is_active',
 '0,Customer_0,Bangalore,Karnataka,India,2023-11-11,True',
 '1,Customer_1,Hyderabad,Delhi,India,2023-08-26,True',
 '2,Customer_2,Ahmedabad,West Bengal,India,2023-06-23,True',
 '3,Customer_3,Bangalore,Tamil Nadu,India,2023-03-24,False',
 '4,Customer_4,Bangalore,Gujarat,India,2023-06-06,False',
 '5,Customer_5,Delhi,Maharashtra,India,2023-04-19,False']

In [6]:
data_rdd.getNumPartitions()

2

# first() -returns the first element of RDD

In [7]:
header = data_rdd.first()

In [8]:
header

'customer_id,name,city,state,country,registration_date,is_active'

# Filter () -> based on the condition its going to transform the dataset

In [9]:
data_rdd = data_rdd.filter(lambda row : row!=header)

In [10]:
data_rdd

PythonRDD[2] at RDD at PythonRDD.scala:53

# Map() - It applies a function to each element in an RDD.

In [11]:
data_rdd.collect()

['0,Customer_0,Bangalore,Karnataka,India,2023-11-11,True',
 '1,Customer_1,Hyderabad,Delhi,India,2023-08-26,True',
 '2,Customer_2,Ahmedabad,West Bengal,India,2023-06-23,True',
 '3,Customer_3,Bangalore,Tamil Nadu,India,2023-03-24,False',
 '4,Customer_4,Bangalore,Gujarat,India,2023-06-06,False',
 '5,Customer_5,Delhi,Maharashtra,India,2023-04-19,False']

In [30]:
first_row = data_rdd.first()
first_row.split(',')[6]=='True'

True

In [29]:
first_row.split(',')

['0', 'Customer_0', 'Bangalore', 'Karnataka', 'India', '2023-11-11', 'True']

In [31]:
first_row.split(',')[2]

'Bangalore'

`we apply tthis function for casting datatype of id into int and aslo last column into Boolean`

In [32]:
def parse_row(row):
    fields = row.split(',')
    return (
     int(fields[0]),
        fields[1],
        fields[2],
        fields[3],
        fields[4],
        fields[5],
        fields[6]=='True'
    )

In [36]:
parse_row(first_row)

(0, 'Customer_0', 'Bangalore', 'Karnataka', 'India', '2023-11-11', True)

` But we use map that he will take every row from data_rdd and pass it as parameter into parse_row function`

In [33]:
parsed_rdd = data_rdd.map(parse_row)

In [18]:
parsed_rdd.collect()

[(0, 'Customer_0', 'Bangalore', 'Karnataka', 'India', '2023-11-11', True),
 (1, 'Customer_1', 'Hyderabad', 'Delhi', 'India', '2023-08-26', True),
 (2, 'Customer_2', 'Ahmedabad', 'West Bengal', 'India', '2023-06-23', True),
 (3, 'Customer_3', 'Bangalore', 'Tamil Nadu', 'India', '2023-03-24', False),
 (4, 'Customer_4', 'Bangalore', 'Gujarat', 'India', '2023-06-06', False),
 (5, 'Customer_5', 'Delhi', 'Maharashtra', 'India', '2023-04-19', False)]

# Advance RDD Operations

In [47]:
name_city_rdd = parsed_rdd.map(lambda row : (row[1],row[2]))
name_city_rdd.collect()

[('Customer_0', 'Bangalore'),
 ('Customer_1', 'Hyderabad'),
 ('Customer_2', 'Ahmedabad'),
 ('Customer_3', 'Bangalore'),
 ('Customer_4', 'Bangalore'),
 ('Customer_5', 'Delhi')]

In [49]:
# Extract Field with Map() in fist row
name_city_rdd = parsed_rdd.map(lambda row : (row[1],row[2]))
name_city_rdd.first()

('Customer_0', 'Bangalore')

In [43]:
active_customers = parsed_rdd.filter(lambda row: row[6] == True)

In [44]:
active_customers.collect()

[(0, 'Customer_0', 'Bangalore', 'Karnataka', 'India', '2023-11-11', True),
 (1, 'Customer_1', 'Hyderabad', 'Delhi', 'India', '2023-08-26', True),
 (2, 'Customer_2', 'Ahmedabad', 'West Bengal', 'India', '2023-06-23', True)]

In [50]:
# distinct() --> Transformation
cities_rdd = parsed_rdd.map(lambda row:row[2]).distinct()

In [51]:
cities_rdd.collect()

['Hyderabad', 'Delhi', 'Bangalore', 'Ahmedabad']

In [52]:
# take()

cities_rdd.take(3)

['Hyderabad', 'Delhi', 'Bangalore']

In [None]:
# Reduce By Key - Transformation
# combines values for each key using an associative reduce function.

In [54]:

# we can chain the functions together
customers_per_city = parsed_rdd.map(lambda row :(row[2],1))

In [55]:
customers_per_city.collect()

[('Bangalore', 1),
 ('Hyderabad', 1),
 ('Ahmedabad', 1),
 ('Bangalore', 1),
 ('Bangalore', 1),
 ('Delhi', 1)]

`reducByKey()` is Transformation

In [56]:
customers_per_city = customers_per_city.reduceByKey(lambda x,y:x+y)

In [57]:
customers_per_city.collect()

[('Hyderabad', 1), ('Delhi', 1), ('Bangalore', 3), ('Ahmedabad', 1)]

`# CountByValue` is Action

In [58]:
cust_per_city = parsed_rdd.map(lambda row:row[2]).countByValue()

In [59]:
cust_per_city

defaultdict(int, {'Bangalore': 3, 'Hyderabad': 1, 'Ahmedabad': 1, 'Delhi': 1})

# Important - ReducebyKey is a Transformation while COuntByValue is an Action

In [63]:
parsed_rdd.collect()

[(0, 'Customer_0', 'Bangalore', 'Karnataka', 'India', '2023-11-11', True),
 (1, 'Customer_1', 'Hyderabad', 'Delhi', 'India', '2023-08-26', True),
 (2, 'Customer_2', 'Ahmedabad', 'West Bengal', 'India', '2023-06-23', True),
 (3, 'Customer_3', 'Bangalore', 'Tamil Nadu', 'India', '2023-03-24', False),
 (4, 'Customer_4', 'Bangalore', 'Gujarat', 'India', '2023-06-06', False),
 (5, 'Customer_5', 'Delhi', 'Maharashtra', 'India', '2023-04-19', False)]

In [72]:
ACTIVE = parsed_rdd.filter(lambda row:row[6])
ACTIVE.map(lambda row : row[2]).collect()

['Bangalore', 'Hyderabad', 'Ahmedabad']

# Combine More Operations

In [73]:
# Cities with active customer

active_cities = parsed_rdd.filter(lambda row:row[6]) \
                          .map(lambda row:row[2]) \
                          .distinct()

active_cities.collect()

['Hyderabad', 'Bangalore', 'Ahmedabad']

# HW <-----

# Count Active Customer By State

In [82]:
# Run this code on 500mb file as well.

# 1st find all active state
active_state = parsed_rdd.filter(lambda row:row[6]) \
                     .map(lambda row:row[3]) \
                     .countByValue()

active_state

defaultdict(int, {'Karnataka': 1, 'Delhi': 1, 'West Bengal': 1})

In [76]:
active_cities.saveAsTextFile("/content/active_cities.csv")

In [83]:
spark.stop()