In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CC3").getOrCreate()


In [0]:
data = [
    (1, "Ravi", "Math", 85, "Mumbai"),
    (2, "Priya", "Physics", 90, "Delhi"),
    (3, "Suresh", "Chemistry", 78, "Bangalore"),
    (4, "Anita", "Math", 92, "Kolkata"),
    (5, "Raj", "Physics", 88, "Chennai")
]
columns = ["id", "name", "subject", "score"]
df = spark.createDataFrame(data, columns)

In [0]:
print("Original DataFrame:")
df.show()

Original DataFrame:
+---+------+---------+-----+---------+
| id|  name|  subject|score|       _5|
+---+------+---------+-----+---------+
|  1|  Ravi|     Math|   85|   Mumbai|
|  2| Priya|  Physics|   90|    Delhi|
|  3|Suresh|Chemistry|   78|Bangalore|
|  4| Anita|     Math|   92|  Kolkata|
|  5|   Raj|  Physics|   88|  Chennai|
+---+------+---------+-----+---------+



In [0]:
from pyspark.sql.functions import col, expr

In [0]:
# Manipulating data in a DataFrame
df = df.withColumn("adjusted_score", col("score") + 5)
print("\nManipulated DataFrame:")
df.show()








Manipulated DataFrame:
+---+------+---------+-----+---------+--------------+
| id|  name|  subject|score|       _5|adjusted_score|
+---+------+---------+-----+---------+--------------+
|  1|  Ravi|     Math|   85|   Mumbai|            90|
|  2| Priya|  Physics|   90|    Delhi|            95|
|  3|Suresh|Chemistry|   78|Bangalore|            83|
|  4| Anita|     Math|   92|  Kolkata|            97|
|  5|   Raj|  Physics|   88|  Chennai|            93|
+---+------+---------+-----+---------+--------------+



In [0]:
# Dropping columns
df = df.drop("score")
df.show()

+---+------+---------+---------+--------------+
| id|  name|  subject|       _5|adjusted_score|
+---+------+---------+---------+--------------+
|  1|  Ravi|     Math|   Mumbai|            90|
|  2| Priya|  Physics|    Delhi|            95|
|  3|Suresh|Chemistry|Bangalore|            83|
|  4| Anita|     Math|  Kolkata|            97|
|  5|   Raj|  Physics|  Chennai|            93|
+---+------+---------+---------+--------------+



In [0]:
# Sorting DataFrame by a column
df = df.orderBy("adjusted_score", ascending=False)
df.show()

+---+------+---------+---------+--------------+
| id|  name|  subject|       _5|adjusted_score|
+---+------+---------+---------+--------------+
|  4| Anita|     Math|  Kolkata|            97|
|  2| Priya|  Physics|    Delhi|            95|
|  5|   Raj|  Physics|  Chennai|            93|
|  1|  Ravi|     Math|   Mumbai|            90|
|  3|Suresh|Chemistry|Bangalore|            83|
+---+------+---------+---------+--------------+



In [0]:
# Aggregations
agg_df = df.groupBy("subject").agg({"adjusted_score": "avg"})
print("\nAggregated DataFrame:")
agg_df.show()


Aggregated DataFrame:
+---------+-------------------+
|  subject|avg(adjusted_score)|
+---------+-------------------+
|     Math|               93.5|
|  Physics|               94.0|
|Chemistry|               83.0|
+---------+-------------------+



In [0]:
# GroupBy operation
grouped_df = df.groupBy("subject").agg({"adjusted_score": "avg"})
print("\nGrouped DataFrame:")
grouped_df.show()


Grouped DataFrame:
+---------+-------------------+
|  subject|avg(adjusted_score)|
+---------+-------------------+
|     Math|               93.5|
|  Physics|               94.0|
|Chemistry|               83.0|
+---------+-------------------+



In [0]:
#Inner Join
student_data = [
    (1, "Ravi", "Mumbai"),
    (2, "Priya", "Delhi"),
    (3, "Suresh", "Bangalore"),
    (4, "Anita", "Kolkata"),
    (5, "Raj", "Chennai"),
    (6, "Divya", "Hyderabad"),
    (7, "Vikas", "Pune"),
    (8, "Pooja", "Ahmedabad"),
    (9, "Kumar", "Jaipur"),
    (10, "Sonia", "Lucknow")
]

course_data = [
    (1, "Math"),
    (2, "Physics"),
    (3, "Chemistry"),
    (4, "History"),
    (5, "Geography"),
    (6, "Computer Science")
]

scores_data = [
    (1, 1, 90),
    (2, 2, 85),
    (3, 3, 92),
    (4, 1, 88),
    (5, 2, 78),
    (6, 3, 95),
    (7, 1, 75),
    (8, 2, 80),
    (9, 3, 87),
    (10, 1, 89)
]

# Create DataFrames for Students, Courses, and Exam Scores
students_df = spark.createDataFrame(student_data, ["student_id", "student_name", "city"])
courses_df = spark.createDataFrame(course_data, ["course_id", "course_name"])
scores_df = spark.createDataFrame(scores_data, ["student_id", "course_id", "score"])

inner_joined_df = students_df.join(scores_df, "student_id", "inner").join(courses_df, "course_id", "inner")
print("\nInner Join Result:")
inner_joined_df.show()


Inner Join Result:
+---------+----------+------------+---------+-----+-----------+
|course_id|student_id|student_name|     city|score|course_name|
+---------+----------+------------+---------+-----+-----------+
|        1|        10|       Sonia|  Lucknow|   89|       Math|
|        1|         7|       Vikas|     Pune|   75|       Math|
|        1|         4|       Anita|  Kolkata|   88|       Math|
|        1|         1|        Ravi|   Mumbai|   90|       Math|
|        2|         8|       Pooja|Ahmedabad|   80|    Physics|
|        2|         5|         Raj|  Chennai|   78|    Physics|
|        2|         2|       Priya|    Delhi|   85|    Physics|
|        3|         9|       Kumar|   Jaipur|   87|  Chemistry|
|        3|         6|       Divya|Hyderabad|   95|  Chemistry|
|        3|         3|      Suresh|Bangalore|   92|  Chemistry|
+---------+----------+------------+---------+-----+-----------+



In [0]:
#Left Outer Join 
left_outer_joined_df = students_df.join(scores_df, "student_id", "left_outer").join(courses_df, "course_id", "left_outer")
print("\nLeft Outer Join Result:")
left_outer_joined_df.show()



Left Outer Join Result:
+---------+----------+------------+---------+-----+-----------+
|course_id|student_id|student_name|     city|score|course_name|
+---------+----------+------------+---------+-----+-----------+
|        1|         1|        Ravi|   Mumbai|   90|       Math|
|        2|         2|       Priya|    Delhi|   85|    Physics|
|        3|         3|      Suresh|Bangalore|   92|  Chemistry|
|        2|         5|         Raj|  Chennai|   78|    Physics|
|        1|         4|       Anita|  Kolkata|   88|       Math|
|        3|         6|       Divya|Hyderabad|   95|  Chemistry|
|        1|         7|       Vikas|     Pune|   75|       Math|
|        2|         8|       Pooja|Ahmedabad|   80|    Physics|
|        3|         9|       Kumar|   Jaipur|   87|  Chemistry|
|        1|        10|       Sonia|  Lucknow|   89|       Math|
+---------+----------+------------+---------+-----+-----------+



In [0]:
#Right Outer Join
right_outer_joined_df = students_df.join(scores_df, "student_id", "right_outer").join(courses_df, "course_id", "right_outer")
print("\nRight Outer Join Result:")
right_outer_joined_df.show()



Right Outer Join Result:
+---------+----------+------------+---------+-----+----------------+
|course_id|student_id|student_name|     city|score|     course_name|
+---------+----------+------------+---------+-----+----------------+
|        1|        10|       Sonia|  Lucknow|   89|            Math|
|        1|         7|       Vikas|     Pune|   75|            Math|
|        1|         4|       Anita|  Kolkata|   88|            Math|
|        1|         1|        Ravi|   Mumbai|   90|            Math|
|        2|         8|       Pooja|Ahmedabad|   80|         Physics|
|        2|         5|         Raj|  Chennai|   78|         Physics|
|        2|         2|       Priya|    Delhi|   85|         Physics|
|        3|         9|       Kumar|   Jaipur|   87|       Chemistry|
|        3|         6|       Divya|Hyderabad|   95|       Chemistry|
|        3|         3|      Suresh|Bangalore|   92|       Chemistry|
|        4|      NULL|        NULL|     NULL| NULL|         History|
|       

In [0]:
#Full Outer Join
full_outer_joined_df = students_df.join(scores_df, "student_id", "full_outer").join(courses_df, "course_id", "full_outer")
print("\nFull Outer Join Result:")
full_outer_joined_df.show()



Full Outer Join Result:
+---------+----------+------------+---------+-----+----------------+
|course_id|student_id|student_name|     city|score|     course_name|
+---------+----------+------------+---------+-----+----------------+
|        1|         1|        Ravi|   Mumbai|   90|            Math|
|        1|         4|       Anita|  Kolkata|   88|            Math|
|        1|         7|       Vikas|     Pune|   75|            Math|
|        1|        10|       Sonia|  Lucknow|   89|            Math|
|        2|         2|       Priya|    Delhi|   85|         Physics|
|        2|         5|         Raj|  Chennai|   78|         Physics|
|        2|         8|       Pooja|Ahmedabad|   80|         Physics|
|        3|         3|      Suresh|Bangalore|   92|       Chemistry|
|        3|         6|       Divya|Hyderabad|   95|       Chemistry|
|        3|         9|       Kumar|   Jaipur|   87|       Chemistry|
|        4|      NULL|        NULL|     NULL| NULL|         History|
|        

In [0]:
# Perform Self Join
self_joined_df = students_df.alias("df1").join(students_df.alias("df2"), "student_id", "inner")
print("\nSelf Join Result:")
self_joined_df.show()


Self Join Result:
+----------+------------+---------+------------+---------+
|student_id|student_name|     city|student_name|     city|
+----------+------------+---------+------------+---------+
|         1|        Ravi|   Mumbai|        Ravi|   Mumbai|
|         2|       Priya|    Delhi|       Priya|    Delhi|
|         3|      Suresh|Bangalore|      Suresh|Bangalore|
|         4|       Anita|  Kolkata|       Anita|  Kolkata|
|         5|         Raj|  Chennai|         Raj|  Chennai|
|         6|       Divya|Hyderabad|       Divya|Hyderabad|
|         7|       Vikas|     Pune|       Vikas|     Pune|
|         8|       Pooja|Ahmedabad|       Pooja|Ahmedabad|
|         9|       Kumar|   Jaipur|       Kumar|   Jaipur|
|        10|       Sonia|  Lucknow|       Sonia|  Lucknow|
+----------+------------+---------+------------+---------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
import pandas as pd

spark = SparkSession.builder.appName("pyspark_functions").getOrCreate()

data = [
    (1, "Sid", "Math", 90),
    (2, "Ani", "Physics", 85),
    (3, "Abhi", "Chemistry", 92),
    (4, "Parth", "Math", 88),
    (5, "Ashmita", "Physics", 78)
]

columns = ["id", "name", "subject", "score"]
df = spark.createDataFrame(data, columns)

print("Original DataFrame:")
df.show()






Original DataFrame:
+---+-------+---------+-----+
| id|   name|  subject|score|
+---+-------+---------+-----+
|  1|    Sid|     Math|   90|
|  2|    Ani|  Physics|   85|
|  3|   Abhi|Chemistry|   92|
|  4|  Parth|     Math|   88|
|  5|Ashmita|  Physics|   78|
+---+-------+---------+-----+



In [0]:
#Simple Function
df = df.withColumn("adjusted_score", col("score") + 5)
print("\nDataFrame after applying a function:")
df.show()




DataFrame after applying a function:
+---+-------+---------+-----+--------------+
| id|   name|  subject|score|adjusted_score|
+---+-------+---------+-----+--------------+
|  1|    Sid|     Math|   90|            95|
|  2|    Ani|  Physics|   85|            90|
|  3|   Abhi|Chemistry|   92|            97|
|  4|  Parth|     Math|   88|            93|
|  5|Ashmita|  Physics|   78|            83|
+---+-------+---------+-----+--------------+



In [0]:
#Complex function
def custom_function(score):
    return score * 2

udf_custom_function = udf(custom_function, IntegerType())

df = df.withColumn("custom_score", udf_custom_function(col("score")))

print("\nDataFrame after applying a UDF:")
df.show()




DataFrame after applying a UDF:
+---+-------+---------+-----+--------------+------------+
| id|   name|  subject|score|adjusted_score|custom_score|
+---+-------+---------+-----+--------------+------------+
|  1|    Sid|     Math|   90|            95|         180|
|  2|    Ani|  Physics|   85|            90|         170|
|  3|   Abhi|Chemistry|   92|            97|         184|
|  4|  Parth|     Math|   88|            93|         176|
|  5|Ashmita|  Physics|   78|            83|         156|
+---+-------+---------+-----+--------------+------------+



In [0]:
# Convert PySpark DataFrame to Pandas DataFrame
pandas_df = df.toPandas()

print("\nPandas DataFrame:")
print(pandas_df)




Pandas DataFrame:
   id     name    subject  score  adjusted_score  custom_score
0   1      Sid       Math     90              95           180
1   2      Ani    Physics     85              90           170
2   3     Abhi  Chemistry     92              97           184
3   4    Parth       Math     88              93           176
4   5  Ashmita    Physics     78              83           156
