In [1]:
# Import SparkContext from PySpark
from pyspark.context import SparkContext

# Create a SparkContext with an application name
sc = SparkContext(appName="Spark-RDD")

In [2]:
# Display the version of Spark and Python
sc.version,sc.pythonVer

('3.5.3', '3.10')

In [3]:
# Display the application name and application ID
sc.appName,sc.applicationId

('Spark-RDD', 'local-1731481099203')

In [4]:
# Check the default number of parallel tasks (partitions)
sc.defaultParallelism

2

In [6]:
# Create an RDD with integers 1 through 6, divided into two partitions
rdd1=sc.parallelize([1,2,3,4,5,6])

In [7]:
rdd1.collect()

[1, 2, 3, 4, 5, 6]

In [8]:
rdd1.getNumPartitions()

2

In [9]:
# Print min, max, sum, and mean of values in rdd1
print(f' Min value :{rdd1.min()}\n Max Value : {rdd1.max()} \n sum of values :{rdd1.sum()}\n mean of value :{rdd1.mean()}')

 Min value :1
 Max Value : 6 
 sum of values :21
 mean of value :3.5


In [10]:
# Define a lambda function to square each element
square =lambda x:(x,x**2)

In [11]:
# Apply map transformation to rdd1 to create a new RDD with tuples (value, value^2)
rdd2=rdd1.map(square)

In [12]:
rdd2.collect()

[(1, 1), (2, 4), (3, 9), (4, 16), (5, 25), (6, 36)]

In [13]:
# Apply flatMap transformation to rdd2, which flattens the tuples into individual values
rdd3=rdd2.flatMap(lambda x:x)
rdd3.collect()

[1, 1, 2, 4, 3, 9, 4, 16, 5, 25, 6, 36]

In [14]:
# Get distinct values from rdd3
rdd3.distinct().collect()

[2, 4, 16, 6, 36, 1, 3, 9, 5, 25]

In [16]:
even_value=lambda x:x%2==0
# Filter rdd3 to keep only even numbers
rdd3.filter(even_value).collect()

[2, 4, 4, 16, 6, 36]

In [17]:
rdd3.filter(lambda x:x%2==0 and x>10).collect()

[16, 36]

In [18]:
rdd3.getNumPartitions()

2

In [19]:
# Use glom() to group data in each partition into a list
rdd3.glom().collect()

[[1, 1, 2, 4, 3, 9], [4, 16, 5, 25, 6, 36]]

In [20]:
indian_cars = [
    "Maruti Suzuki Alto", "Tata Tiago", "Hyundai i10", "Renault Kwid", "Maruti Suzuki Swift",
    "Honda City", "Maruti Suzuki Ciaz", "Hyundai Verna", "Tata Tigor", "Skoda Slavia",
    "Mahindra Scorpio", "Tata Harrier", "Maruti Suzuki Brezza", "Hyundai Creta", "Kia Seltos"
]

In [21]:
# Parallelize the car list into an RDD with 4 partitions
indian_cars_rdd=sc.parallelize(indian_cars,4)

In [22]:
indian_cars_rdd.getNumPartitions()

4

In [23]:
# Use glom() to view the data in each partition
for i in indian_cars_rdd.glom().collect():
    print(i)

['Maruti Suzuki Alto', 'Tata Tiago', 'Hyundai i10']
['Renault Kwid', 'Maruti Suzuki Swift', 'Honda City']
['Maruti Suzuki Ciaz', 'Hyundai Verna', 'Tata Tigor']
['Skoda Slavia', 'Mahindra Scorpio', 'Tata Harrier', 'Maruti Suzuki Brezza', 'Hyundai Creta', 'Kia Seltos']


In [24]:
# Repartition the RDD into 6 partitions
indian_cars_rdd1=indian_cars_rdd.repartition(6)

In [25]:
indian_cars_rdd1.getNumPartitions()

6

In [26]:
# View data in each partition after repartitioning
for i in indian_cars_rdd1.glom().collect():
    print(i)

['Skoda Slavia', 'Mahindra Scorpio', 'Tata Harrier', 'Maruti Suzuki Brezza', 'Hyundai Creta', 'Kia Seltos']
[]
[]
['Maruti Suzuki Alto', 'Tata Tiago', 'Hyundai i10']
['Renault Kwid', 'Maruti Suzuki Swift', 'Honda City']
['Maruti Suzuki Ciaz', 'Hyundai Verna', 'Tata Tigor']


In [27]:
# Repartition the RDD into 3 partitions and view the result using glom()
indian_cars_rdd.repartition(3).glom().collect()

[['Maruti Suzuki Alto',
  'Tata Tiago',
  'Hyundai i10',
  'Skoda Slavia',
  'Mahindra Scorpio',
  'Tata Harrier',
  'Maruti Suzuki Brezza',
  'Hyundai Creta',
  'Kia Seltos'],
 ['Renault Kwid', 'Maruti Suzuki Swift', 'Honda City'],
 ['Maruti Suzuki Ciaz', 'Hyundai Verna', 'Tata Tigor']]

In [28]:
rdd3.collect()

[1, 1, 2, 4, 3, 9, 4, 16, 5, 25, 6, 36]

In [29]:
# Perform a reduce operation on rdd3 to sum all the elements
result = rdd3.reduce(lambda a, b: a + b)
print(type(result),result)

<class 'int'> 112


In [30]:
# Creating a key-value RDD
pairs_rdd = sc.parallelize([("Maruti Suzuki Alto", 3), ("Hyundai i10", 1), ("Hyundai i10", 2), ("Tata Harrier", 2),("Maruti Suzuki Alto", 10),("Hyundai i10", 15)])

In [31]:
# Reduce by key to sum values for each key
reduceByKey = pairs_rdd.reduceByKey(lambda a, b: a + b)
print(type(reduceByKey),reduceByKey.collect())

<class 'pyspark.rdd.PipelinedRDD'> [('Tata Harrier', 2), ('Maruti Suzuki Alto', 13), ('Hyundai i10', 18)]


In [32]:
# Group by key
grouped = pairs_rdd.groupByKey().mapValues(list)
print(type(grouped),grouped.collect())

<class 'pyspark.rdd.PipelinedRDD'> [('Tata Harrier', [2]), ('Maruti Suzuki Alto', [3, 10]), ('Hyundai i10', [1, 2, 15])]


In [33]:
states = {"NY":"New york", "CA":"California", "FL":"Florida"}
print(states.items())
print(states.keys())

dict_items([('NY', 'New york'), ('CA', 'California'), ('FL', 'Florida')])
dict_keys(['NY', 'CA', 'FL'])


In [34]:
# Parallelize the dictionary into an RDD
states_rdd = sc.parallelize(states.items())
print(states_rdd.collect(),type(states_rdd.collect()))

[('NY', 'New york'), ('CA', 'California'), ('FL', 'Florida')] <class 'list'>


In [35]:
data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]
data_rdd = sc.parallelize(data)

In [36]:
data_rdd.collect()

[('James', 'Smith', 'USA', 'CA'),
 ('Michael', 'Rose', 'USA', 'NY'),
 ('Robert', 'Williams', 'USA', 'CA'),
 ('Maria', 'Jones', 'USA', 'FL')]

In [37]:
# Map the RDD to create a key-value RDD where the state code is the key
rdd_keyed = data_rdd.map(lambda x: (x[3], x))
rdd_keyed.collect()

[('CA', ('James', 'Smith', 'USA', 'CA')),
 ('NY', ('Michael', 'Rose', 'USA', 'NY')),
 ('CA', ('Robert', 'Williams', 'USA', 'CA')),
 ('FL', ('Maria', 'Jones', 'USA', 'FL'))]

In [39]:
# Perform a join operation with states_rdd to add state names
joined_rdd = rdd_keyed.join(states_rdd)

In [40]:
joined_rdd.collect()

[('CA', (('James', 'Smith', 'USA', 'CA'), 'California')),
 ('CA', (('Robert', 'Williams', 'USA', 'CA'), 'California')),
 ('NY', (('Michael', 'Rose', 'USA', 'NY'), 'New york')),
 ('FL', (('Maria', 'Jones', 'USA', 'FL'), 'Florida'))]

In [41]:
joined_rdd.map(lambda x: (*x[1][0][:-1],x[1][1])).collect()

[('James', 'Smith', 'USA', 'California'),
 ('Robert', 'Williams', 'USA', 'California'),
 ('Michael', 'Rose', 'USA', 'New york'),
 ('Maria', 'Jones', 'USA', 'Florida')]

In [42]:
# Broadcast the states dictionary to optimize lookup during transformations
broadcastStates = sc.broadcast(states)
# Create a function that retrieves the full state name based on the state code
def state_convert(code):
    return broadcastStates.value[code]

# Map the data_rdd to replace the state code with the full state name
result = data_rdd.map(lambda x: (*x[:2],state_convert(x[3])))

In [43]:
result.collect()

[('James', 'Smith', 'California'),
 ('Michael', 'Rose', 'New york'),
 ('Robert', 'Williams', 'California'),
 ('Maria', 'Jones', 'Florida')]

In [44]:
# To convert the RDD into a DataFrame, we first need to initialize a SparkSession
from pyspark.sql import SparkSession
spark= SparkSession.builder.getOrCreate()

In [45]:
sc.applicationId,spark.sparkContext.applicationId

('local-1731481099203', 'local-1731481099203')

In [46]:
# Convert the RDD to a DataFrame
result.toDF().collect()

[Row(_1='James', _2='Smith', _3='California'),
 Row(_1='Michael', _2='Rose', _3='New york'),
 Row(_1='Robert', _2='Williams', _3='California'),
 Row(_1='Maria', _2='Jones', _3='Florida')]

In [47]:
result.toDF().show()

+-------+--------+----------+
|     _1|      _2|        _3|
+-------+--------+----------+
|  James|   Smith|California|
|Michael|    Rose|  New york|
| Robert|Williams|California|
|  Maria|   Jones|   Florida|
+-------+--------+----------+



In [48]:
rdd_df=result.toDF(['first_name','last_name','state'])

In [49]:
rdd_df.collect()

[Row(first_name='James', last_name='Smith', state='California'),
 Row(first_name='Michael', last_name='Rose', state='New york'),
 Row(first_name='Robert', last_name='Williams', state='California'),
 Row(first_name='Maria', last_name='Jones', state='Florida')]

In [50]:
rdd_df.show()

+----------+---------+----------+
|first_name|last_name|     state|
+----------+---------+----------+
|     James|    Smith|California|
|   Michael|     Rose|  New york|
|    Robert| Williams|California|
|     Maria|    Jones|   Florida|
+----------+---------+----------+



In [51]:
rdd_df.printSchema()

root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- state: string (nullable = true)



In [52]:
rdd_df.dtypes

[('first_name', 'string'), ('last_name', 'string'), ('state', 'string')]

In [53]:
print(type(rdd_df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [56]:
ids=range(1,10)

In [57]:
spark.createDataFrame(ids,'int').show()

+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
+-----+



In [58]:
data = [
    (10, "Aarav"),
    (11, "Vivaan"),
    (12, "Aditya"),
    (13, "Diya"),
    (14, "Anaya"),
    (15, "Ishaan"),
    (16, "Meera"),
    (17, "Rohan"),
    (18, "Lakshmi"),
    (19, "Neha")
]

In [59]:
df=spark.createDataFrame(data)

In [60]:
df.show()

+---+-------+
| _1|     _2|
+---+-------+
| 10|  Aarav|
| 11| Vivaan|
| 12| Aditya|
| 13|   Diya|
| 14|  Anaya|
| 15| Ishaan|
| 16|  Meera|
| 17|  Rohan|
| 18|Lakshmi|
| 19|   Neha|
+---+-------+



In [61]:
df.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)



In [62]:
df1=spark.createDataFrame(data,'id string , name string')

In [63]:
df1.show()

+---+-------+
| id|   name|
+---+-------+
| 10|  Aarav|
| 11| Vivaan|
| 12| Aditya|
| 13|   Diya|
| 14|  Anaya|
| 15| Ishaan|
| 16|  Meera|
| 17|  Rohan|
| 18|Lakshmi|
| 19|   Neha|
+---+-------+



In [64]:
df1.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



In [65]:
data_dict=[
    {'user_id': 10, 'name': 'Aarav'},
    {'user_id': 11, 'name': 'Vivaan'},
    {'user_id': 12, 'name': 'Aditya'},
    {'user_id': 13, 'name': 'Diya'},
    {'user_id': 14, 'name': 'Anaya'},
    {'user_id': 15, 'name': 'Ishaan'},
    {'user_id': 16, 'name': 'Meera'},
    {'user_id': 17, 'name': 'Rohan'},
    {'user_id': 18, 'name': 'Lakshmi'},
    {'user_id': 19, 'name': 'Neha'}
]

In [66]:
[i.values() for i in data_dict]

[dict_values([10, 'Aarav']),
 dict_values([11, 'Vivaan']),
 dict_values([12, 'Aditya']),
 dict_values([13, 'Diya']),
 dict_values([14, 'Anaya']),
 dict_values([15, 'Ishaan']),
 dict_values([16, 'Meera']),
 dict_values([17, 'Rohan']),
 dict_values([18, 'Lakshmi']),
 dict_values([19, 'Neha'])]

In [67]:
spark.createDataFrame(
    [tuple(i.values()) for i in data_dict]
    ,'user_id string,user_name string'
    ).show()

+-------+---------+
|user_id|user_name|
+-------+---------+
|     10|    Aarav|
|     11|   Vivaan|
|     12|   Aditya|
|     13|     Diya|
|     14|    Anaya|
|     15|   Ishaan|
|     16|    Meera|
|     17|    Rohan|
|     18|  Lakshmi|
|     19|     Neha|
+-------+---------+



In [68]:
from pyspark.sql import Row

In [69]:
Row((1,2))

<Row((1, 2))>

In [70]:
Row(data_dict[5])

<Row({'user_id': 15, 'name': 'Ishaan'})>

In [71]:
[Row(*i.values()) for i in data_dict]

[<Row(10, 'Aarav')>,
 <Row(11, 'Vivaan')>,
 <Row(12, 'Aditya')>,
 <Row(13, 'Diya')>,
 <Row(14, 'Anaya')>,
 <Row(15, 'Ishaan')>,
 <Row(16, 'Meera')>,
 <Row(17, 'Rohan')>,
 <Row(18, 'Lakshmi')>,
 <Row(19, 'Neha')>]

In [72]:
spark.createDataFrame([Row(*i.values()) for i in data_dict],'id int , name string').show()

+---+-------+
| id|   name|
+---+-------+
| 10|  Aarav|
| 11| Vivaan|
| 12| Aditya|
| 13|   Diya|
| 14|  Anaya|
| 15| Ishaan|
| 16|  Meera|
| 17|  Rohan|
| 18|Lakshmi|
| 19|   Neha|
+---+-------+



In [73]:
[Row(**i) for i in data_dict]

[Row(user_id=10, name='Aarav'),
 Row(user_id=11, name='Vivaan'),
 Row(user_id=12, name='Aditya'),
 Row(user_id=13, name='Diya'),
 Row(user_id=14, name='Anaya'),
 Row(user_id=15, name='Ishaan'),
 Row(user_id=16, name='Meera'),
 Row(user_id=17, name='Rohan'),
 Row(user_id=18, name='Lakshmi'),
 Row(user_id=19, name='Neha')]

In [74]:
spark.createDataFrame([Row(*i.values()) for i in data_dict],'id int , name string').show()

+---+-------+
| id|   name|
+---+-------+
| 10|  Aarav|
| 11| Vivaan|
| 12| Aditya|
| 13|   Diya|
| 14|  Anaya|
| 15| Ishaan|
| 16|  Meera|
| 17|  Rohan|
| 18|Lakshmi|
| 19|   Neha|
+---+-------+

