In [101]:
spark


In [102]:
from pyspark.sql import SparkSession

In [103]:
spark=SparkSession.builder \
.appName("RDD_Operations")\
.getOrCreate()

In [78]:
customer_data=[
    "customer_id,name,city,state,country,registration_date,is_active",
    "0,customer_0,banglore,karnataka,India,2023-11-11,True",
    "1,customer_1,Hyderabad,Telangana,India,2023-08-26,True",
    "2,customer_2,Ahmdabad,WestBengal,India,2022-08-13,True",
    "3,customer_3,jaipur,Rajastha,India,2021-09-18,False",
    "4,customer_4,Delhi,Haryana,India,2023-04-25,False",
    "5,customer_5,Pune,Maharashtra,India,2022-05-28,False",
]

In [79]:
data_rdd=spark.sparkContext.parallelize(customer_data)

In [80]:
data_rdd.getNumPartitions()

2

## RDD-Resilient Distributed dataset


- first()-return first elemnet of RDD


In [81]:
header=data_rdd.first()

                                                                                

In [82]:
header


'customer_id,name,city,state,country,registration_date,is_active'

# filter()=based on condition its going to transform dataset

In [83]:
data_rdd=data_rdd.filter(lambda row:row!=header)


In [84]:
data_rdd

PythonRDD[2] at RDD at PythonRDD.scala:53

# Map()=it applies a function to each element on rdd


In [85]:
data_rdd.collect()

                                                                                

['0,customer_0,banglore,karnataka,India,2023-11-11,True',
 '1,customer_1,Hyderabad,Telangana,India,2023-08-26,True',
 '2,customer_2,Ahmdabad,WestBengal,India,2022-08-13,True',
 '3,customer_3,jaipur,Rajastha,India,2021-09-18,False',
 '4,customer_4,Delhi,Haryana,India,2023-04-25,False',
 '5,customer_5,Pune,Maharashtra,India,2022-05-28,False']

In [86]:
def parse_row(row):
    fields=row.split(',')
    return(
        int(fields[0]),
        fields[1],
        fields[2],
        fields[3],
        fields[4],
        fields[5],
        fields[6]=='True'
    )
        


In [87]:
parsed_rdd=data_rdd.map(parse_row)

In [88]:
parsed_rdd.collect()

[(0, 'customer_0', 'banglore', 'karnataka', 'India', '2023-11-11', True),
 (1, 'customer_1', 'Hyderabad', 'Telangana', 'India', '2023-08-26', True),
 (2, 'customer_2', 'Ahmdabad', 'WestBengal', 'India', '2022-08-13', True),
 (3, 'customer_3', 'jaipur', 'Rajastha', 'India', '2021-09-18', False),
 (4, 'customer_4', 'Delhi', 'Haryana', 'India', '2023-04-25', False),
 (5, 'customer_5', 'Pune', 'Maharashtra', 'India', '2022-05-28', False)]

## Advance RDD operations

### Extract a fiels with map()-City and state


In [89]:
name_city_rdd=parsed_rdd.map(lambda row:(row[1],row[2]))
name_city_rdd.first()

('customer_0', 'banglore')

 ### Filter iout active customers

In [90]:
active_customers=parsed_rdd.filter(lambda row:row[6]==True)

In [91]:
active_customers.collect()

[(0, 'customer_0', 'banglore', 'karnataka', 'India', '2023-11-11', True),
 (1, 'customer_1', 'Hyderabad', 'Telangana', 'India', '2023-08-26', True),
 (2, 'customer_2', 'Ahmdabad', 'WestBengal', 'India', '2022-08-13', True)]

In [92]:
#distinct()-Transformation

In [93]:
cities_rdd=parsed_rdd.map(lambda row:row[2]).distinct()

In [94]:
cities_rdd.collect()

['banglore', 'Hyderabad', 'Ahmdabad', 'Delhi', 'Pune', 'jaipur']

### take()

In [95]:
cities_rdd.take(2)

['banglore', 'Hyderabad']


## Reduce by key(transformation)
### combines values for each key using an associative reduce function

### we can chain the function together

In [96]:
customer_per_city=parsed_rdd.map(lambda row:(row[2],1)).reduceByKey(lambda x,y:x+y)

In [97]:
customer_per_city.collect()

[('Ahmdabad', 1),
 ('Delhi', 1),
 ('Pune', 1),
 ('banglore', 1),
 ('Hyderabad', 1),
 ('jaipur', 1)]

### count ByValue(action)

In [98]:
customer_per_city=parsed_rdd.map(lambda row:row[2]).countByValue()

In [99]:
customer_per_city

defaultdict(int,
            {'banglore': 1,
             'Hyderabad': 1,
             'Ahmdabad': 1,
             'jaipur': 1,
             'Delhi': 1,
             'Pune': 1})

### Combine more operations

In [100]:
#cities with active customers
active_cities= parsed_rdd.filter(lambda row:row[6])\
                          .map(lambda row:row[2]) \
                          .distinct()
active_cities.collect()

['Ahmdabad', 'banglore', 'Hyderabad']

In [104]:
spark.stop()