In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").getOrCreate()

 # Creating RDD by Using Parallelize() from a Python List

In [2]:
l = [1, 2, 3, 4, 5, 6, 7]
print(type(l))
print(l)
rdd_1 = spark.sparkContext.parallelize(l)
print(type(rdd_1))
print(rdd_1.collect())

<class 'list'>
[1, 2, 3, 4, 5, 6, 7]
<class 'pyspark.rdd.RDD'>
[1, 2, 3, 4, 5, 6, 7]


# Creating RDD by using textFile() from a text or csv file

In [3]:
rdd_2 = spark.sparkContext.textFile(r"C:\Users\tharu\Project_Git_Push\New_Tharun_Branch\Tharun_Projects\Sample_Data\data_1.csv")
print(type(rdd_2))
print(rdd_2.collect())

<class 'pyspark.rdd.RDD'>
['CustomerID,FirstName,LastName,Email,PhoneNumber,Address,City,State,PostalCode,Country,DateOfBirth', '1,John,Doe,john.doe@example.com,555-1234,123 Elm St,Springfield,IL,62704,USA,1985-03-15', '2,Jane,Smith,jane.smith@example.com,555-5678,456 Oak St,Columbus,OH,43215,USA,1990-06-20', '3,Bob,Johnson,bob.johnson@example.com,555-9876,789 Pine St,Dallas,TX,75201,USA,1982-11-05', '4,Alice,Williams,alice.williams@example.com,555-2468,321 Maple St,Seattle,WA,98101,USA,1988-09-10', '5,Michael,Brown,michael.brown@example.com,555-1357,654 Cedar St,San Francisco,CA,94102,USA,1975-12-25', '6,Emily,Garcia,emily.garcia@example.com,555-3698,987 Birch St,Miami,FL,33101,USA,1995-04-10', '7,David,Martinez,david.martinez@example.com,555-7412,258 Ash St,Denver,CO,80201,USA,1987-07-23', '8,Olivia,Thompson,olivia.thompson@example.com,555-8529,753 Palm St,Phoenix,AZ,85001,USA,1993-02-14', '9,Daniel,Anderson,daniel.anderson@example.com,555-3692,125 Willow St,New York,NY,10001,USA,198

# creating RDD from a RDD

In [4]:
rdd_3 = rdd_1.filter(lambda x: x >= 2)
print(rdd_3.collect())
print(type(rdd_3))

[2, 3, 4, 5, 6, 7]
<class 'pyspark.rdd.PipelinedRDD'>


In [5]:
df = spark.read.csv(r"C:\Users\tharu\Project_Git_Push\New_Tharun_Branch\Tharun_Projects\Sample_Data\data_1.csv", header=True, inferSchema=True)
rdd_4 = df.rdd
print(type(rdd_4))
print(rdd_4)
print(rdd_4.collect())


<class 'pyspark.rdd.RDD'>
MapPartitionsRDD[18] at javaToPython at NativeMethodAccessorImpl.java:0
[Row(CustomerID=1, FirstName='John', LastName='Doe', Email='john.doe@example.com', PhoneNumber='555-1234', Address='123 Elm St', City='Springfield', State='IL', PostalCode=62704, Country='USA', DateOfBirth=datetime.date(1985, 3, 15)), Row(CustomerID=2, FirstName='Jane', LastName='Smith', Email='jane.smith@example.com', PhoneNumber='555-5678', Address='456 Oak St', City='Columbus', State='OH', PostalCode=43215, Country='USA', DateOfBirth=datetime.date(1990, 6, 20)), Row(CustomerID=3, FirstName='Bob', LastName='Johnson', Email='bob.johnson@example.com', PhoneNumber='555-9876', Address='789 Pine St', City='Dallas', State='TX', PostalCode=75201, Country='USA', DateOfBirth=datetime.date(1982, 11, 5)), Row(CustomerID=4, FirstName='Alice', LastName='Williams', Email='alice.williams@example.com', PhoneNumber='555-2468', Address='321 Maple St', City='Seattle', State='WA', PostalCode=98101, Country=

In [6]:
empty_rdd = spark.sparkContext.emptyRDD()
print(type(empty_rdd))

<class 'pyspark.rdd.RDD'>


# Map
Purpose: Transforms each element in the RDD using a function you provide.

How it works: For each element in the RDD, the map function applies the given function and returns a new RDD with each element transformed.

Output: The resulting RDD will have the same number of elements as the input RDD.

In [7]:
x = ['hello', 'world', 'watsapp']
print(x)
rdd_4 = spark.sparkContext.parallelize(x)
rdd_5 = rdd_4.map(lambda x: x.upper())

print(rdd_5.collect())

['hello', 'world', 'watsapp']
['HELLO', 'WORLD', 'WATSAPP']


# flatmap
Purpose: Transforms each element in the RDD using a function you provide and then flattens the results.

How it works: For each element in the RDD, the flatMap function applies the given function, which should return an iterable. The results are then flattened into a single RDD.

Output: The resulting RDD will contain the flattened elements from the iterables returned by the function. This means the output RDD might have more or fewer elements than the input RDD.

In [8]:
rdd_6 = rdd_4.flatMap(lambda x: x.upper())
print(rdd_6.collect())

['H', 'E', 'L', 'L', 'O', 'W', 'O', 'R', 'L', 'D', 'W', 'A', 'T', 'S', 'A', 'P', 'P']


In [9]:
filter_rdd = rdd_1.filter(lambda x: x%2 == 0)
print(filter_rdd.collect())

[2, 4, 6]


In [10]:
a = [[1, 2, 3, 4], [6, 8, 9, 7, 5], [12, 13, 14]]  

# Create an RDD from the list
rdd_flatmap = spark.sparkContext.parallelize(a)

# Flatten the list and filter only even numbers
# Lambda function flattens the list and filters even numbers
fm_rdd = rdd_flatmap.flatMap(lambda x: x).filter(lambda y: y % 2 == 0)

# Collect the results and print them
print(fm_rdd.collect())


[2, 4, 6, 8, 12, 14]


In [11]:
a = "hello world..!!, Very good morning"
b = a.split(" ")
print(b)

['hello', 'world..!!,', 'Very', 'good', 'morning']


# GroupBy

In [12]:
g = [(20, 2000), (25, 3000), (20, 5000), (30, 4000), (25, 2000)]
g_rdd = spark.sparkContext.parallelize(g)
g_rdd_1 = g_rdd.groupByKey()
print(g_rdd_1.collect())
g_rdd_2 = g_rdd_1.mapValues(lambda x: sum(x))
print(g_rdd_2.collect())

[(25, <pyspark.resultiterable.ResultIterable object at 0x000001B042A2AE10>), (20, <pyspark.resultiterable.ResultIterable object at 0x000001B042A40790>), (30, <pyspark.resultiterable.ResultIterable object at 0x000001B042A2B750>)]
[(25, 5000), (20, 7000), (30, 4000)]


In [13]:
g_rdd_3 = g_rdd.reduceByKey(lambda x,y : x+y)
print(g_rdd_3.collect())

[(25, 5000), (20, 7000), (30, 4000)]


In [14]:
text_rdd = spark.sparkContext.textFile(r"C:\Users\tharu\Project_Git_Push\New_Tharun_Branch\Tharun_Projects\Sample_Data\Text_file.txt")
rdd1 = text_rdd.flatMap(lambda x : x.split(" "))
print(rdd1.collect())

['To', 'test', 'a', 'sample', 'data', 'for', 'test', 'we', 'need', 'to', 'test', 'the', 'data', 'by', 'testing', 'data', 'with', 'data,', 'data', 'about', 'data', 'is', 'called', 'metadata', '.', 'reed', 'the', 'data', 'to', 'understand', 'the', 'data', 'and', 'the', 'type', 'of', 'data.']


In [17]:
rdd2 = rdd1.map(lambda x : (x,1))
print(rdd2.collect())

[('To', 1), ('test', 1), ('a', 1), ('sample', 1), ('data', 1), ('for', 1), ('test', 1), ('we', 1), ('need', 1), ('to', 1), ('test', 1), ('the', 1), ('data', 1), ('by', 1), ('testing', 1), ('data', 1), ('with', 1), ('data,', 1), ('data', 1), ('about', 1), ('data', 1), ('is', 1), ('called', 1), ('metadata', 1), ('.', 1), ('reed', 1), ('the', 1), ('data', 1), ('to', 1), ('understand', 1), ('the', 1), ('data', 1), ('and', 1), ('the', 1), ('type', 1), ('of', 1), ('data.', 1)]


In [19]:
rdd3 = rdd2.reduceByKey(lambda x,y : x+y)
print(rdd3.collect())

[('To', 1), ('sample', 1), ('for', 1), ('to', 2), ('by', 1), ('testing', 1), ('with', 1), ('data,', 1), ('about', 1), ('called', 1), ('metadata', 1), ('.', 1), ('reed', 1), ('and', 1), ('type', 1), ('of', 1), ('data.', 1), ('test', 3), ('a', 1), ('data', 7), ('we', 1), ('need', 1), ('the', 4), ('is', 1), ('understand', 1)]
