In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PartitioningDemo").master("yarn").getOrCreate()

spark


25/06/29 20:04:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
data = [
"Goku Vegeta Gohan",
"Goku Frieza Goku",
"Vegeta Goku Gohan Frieza",
"Gohan Frieza Goku Goku"
]

In [5]:
rdd = spark.sparkContext.parallelize(data, 2)

In [6]:
rdd.getNumPartitions()

2

In [7]:
hdfs_path = "/tmp/names.txt"

rdd1 = spark.sparkContext.textFile(hdfs_path)

In [8]:
rdd1.getNumPartitions()

2

- even thoguh the file size is less than block size we get 2 partitions because of setting

In [9]:
# Default Parallelism
print(f"Default parallelism: {spark.sparkContext.defaultParallelism}")

Default parallelism: 2


In [10]:
rdd1 = rdd1.repartition(6)

rdd1.getNumPartitions()

6

In [37]:
spark.stop()

# when we stop Spark, it stops the job. they will be seen in history server, only when the session is stopped

# Restart Kernel

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RDD_Operations").master("yarn").getOrCreate()

spark


25/06/29 21:51:18 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
customer_data = [
    "customer_id,name,city,state,country,registration_date,is_active",
    "0,Customer_0,Pune,Maharashtra,India,2023-06-29,False",
    "1,Customer_1,Bangalore,Tamil Nadu,India,2023-12-07,True",
    "2,Customer_2,Hyderabad,Gujarat,India,2023-10-27,True",
    "3,Customer_3,Bangalore,Karnataka,India,2023-10-17,False",
    "4,Customer_4,Ahmedabad,Karnataka,India,2023-03-14,False",
    "5,Customer_5,Hyderabad,Karnataka,India,2023-07-28,False"
]

In [3]:
data_rdd = spark.sparkContext.parallelize(customer_data)

In [4]:
data_rdd.collect()

                                                                                

['customer_id,name,city,state,country,registration_date,is_active',
 '0,Customer_0,Pune,Maharashtra,India,2023-06-29,False',
 '1,Customer_1,Bangalore,Tamil Nadu,India,2023-12-07,True',
 '2,Customer_2,Hyderabad,Gujarat,India,2023-10-27,True',
 '3,Customer_3,Bangalore,Karnataka,India,2023-10-17,False',
 '4,Customer_4,Ahmedabad,Karnataka,India,2023-03-14,False',
 '5,Customer_5,Hyderabad,Karnataka,India,2023-07-28,False']

In [5]:
# first() --> Action

header = data_rdd.first()
print(header)

[Stage 1:>                                                          (0 + 1) / 1]

customer_id,name,city,state,country,registration_date,is_active


                                                                                

In [6]:
# filter () --> Transformation

data_rdd = data_rdd.filter(lambda row : row!=header)
print(data_rdd.collect())

['0,Customer_0,Pune,Maharashtra,India,2023-06-29,False', '1,Customer_1,Bangalore,Tamil Nadu,India,2023-12-07,True', '2,Customer_2,Hyderabad,Gujarat,India,2023-10-27,True', '3,Customer_3,Bangalore,Karnataka,India,2023-10-17,False', '4,Customer_4,Ahmedabad,Karnataka,India,2023-03-14,False', '5,Customer_5,Hyderabad,Karnataka,India,2023-07-28,False']


                                                                                

# Map funtions

In [7]:
def parse_data(row):
    fields = row.split(',')
    return (
        int(fields[0]),
        fields[1],
        fields[2],
        fields[3],
        fields[4],
        fields[5],
        fields[6]=='True'
    )


In [8]:
test_data = "1,Customer_1,Bangalore,Tamil Nadu,India,2023-12-07,True"
print(test_data.split(','))

['1', 'Customer_1', 'Bangalore', 'Tamil Nadu', 'India', '2023-12-07', 'True']


In [9]:
parsed_rdd = data_rdd.map(parse_data)
parsed_rdd.collect()

[(0, 'Customer_0', 'Pune', 'Maharashtra', 'India', '2023-06-29', False),
 (1, 'Customer_1', 'Bangalore', 'Tamil Nadu', 'India', '2023-12-07', True),
 (2, 'Customer_2', 'Hyderabad', 'Gujarat', 'India', '2023-10-27', True),
 (3, 'Customer_3', 'Bangalore', 'Karnataka', 'India', '2023-10-17', False),
 (4, 'Customer_4', 'Ahmedabad', 'Karnataka', 'India', '2023-03-14', False),
 (5, 'Customer_5', 'Hyderabad', 'Karnataka', 'India', '2023-07-28', False)]

In [10]:
name_city_rdd = parsed_rdd.map(lambda row : (row[1],row[2]))
name_city_rdd.collect()

[('Customer_0', 'Pune'),
 ('Customer_1', 'Bangalore'),
 ('Customer_2', 'Hyderabad'),
 ('Customer_3', 'Bangalore'),
 ('Customer_4', 'Ahmedabad'),
 ('Customer_5', 'Hyderabad')]

In [11]:
cities_rdd = parsed_rdd.map(lambda row : row[2]).distinct()
cities_rdd.take(1)

# take gives number of records given

                                                                                

['Hyderabad']

In [12]:
# Reduce By Key

# count of customers per city
customers_rdd = parsed_rdd.map(lambda row : (row[2],1)).reduceByKey(lambda x,y:x+y)
print(customers_rdd.collect())

[('Pune', 1), ('Hyderabad', 2), ('Ahmedabad', 1), ('Bangalore', 2)]


In [13]:
cities_rdd.collect()

['Hyderabad', 'Pune', 'Ahmedabad', 'Bangalore']

In [14]:
parsed_rdd.map(lambda row : row[2]).countByValue()

# this is a action, that is why we dont do .collect()
# reduceByKey is transformation and we can do other operations after it
#  whereas actions are the last step and operations cannot be done after it

defaultdict(int, {'Pune': 1, 'Bangalore': 2, 'Hyderabad': 2, 'Ahmedabad': 1})

In [15]:
parsed_rdd.map(lambda row : (row[2],1)).reduceByKey(lambda x,y:x+y)

PythonRDD[21] at RDD at PythonRDD.scala:53

In [16]:
active_cities = parsed_rdd.filter(lambda row : row[6]==True).map(lambda row:row[2]).distinct()
active_cities.collect()

['Hyderabad', 'Bangalore']

In [18]:
customer_by_state = parsed_rdd.map(lambda row : (row[3],1)).reduceByKey(lambda x,y:x+y)
print(customer_by_state.collect())

[('Maharashtra', 1), ('Tamil Nadu', 1), ('Gujarat', 1), ('Karnataka', 3)]


In [19]:
spark.stop()