#Module 1: Setup & SparkSession Initialization

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("BotCampus PySpark Practice").master("local[*]").getOrCreate()


In [2]:
data = [
    ("Anjali", "Bangalore", 24),
    ("Ravi", "Hyderabad", 28),
    ("Kavya", "Delhi", 22),
    ("Meena", "Chennai", 25),
    ("Arjun", "Mumbai", 30)
]

columns = ["name", "city", "age"]

df = spark.createDataFrame(data, columns)
df.show()
df.printSchema()


+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Anjali|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
| Arjun|   Mumbai| 30|
+------+---------+---+

root
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- age: long (nullable = true)



In [4]:
rdd = df.rdd
print(rdd.collect())


[Row(name='Anjali', city='Bangalore', age=24), Row(name='Ravi', city='Hyderabad', age=28), Row(name='Kavya', city='Delhi', age=22), Row(name='Meena', city='Chennai', age=25), Row(name='Arjun', city='Mumbai', age=30)]


In [5]:
rdd_mapped = rdd.map(lambda row: (row.name.upper(), row.city.upper()))
print(rdd_mapped.collect())


[('ANJALI', 'BANGALORE'), ('RAVI', 'HYDERABAD'), ('KAVYA', 'DELHI'), ('MEENA', 'CHENNAI'), ('ARJUN', 'MUMBAI')]


#Module 2: RDDs & Transformations

In [6]:
feedback = spark.sparkContext.parallelize([
    "Ravi from Bangalore loved the delivery",
    "Meena from Hyderabad had a late order",
    "Ajay from Pune liked the service",
    "Anjali from Delhi faced UI issues",
    "Rohit from Mumbai gave positive feedback"
])


In [11]:
#Split each line into words ( flatMap ).
words=feedback.flatMap(lambda line: line.lower().split())
print(words.collect())

#Remove stop words ( from , the , etc.).
stop_words={"from", "the", "a", "had", "an"}
filtered_words=words.filter(lambda word: word not in stop_words)
print(filtered_words.collect())




['ravi', 'from', 'bangalore', 'loved', 'the', 'delivery', 'meena', 'from', 'hyderabad', 'had', 'a', 'late', 'order', 'ajay', 'from', 'pune', 'liked', 'the', 'service', 'anjali', 'from', 'delhi', 'faced', 'ui', 'issues', 'rohit', 'from', 'mumbai', 'gave', 'positive', 'feedback']
['ravi', 'bangalore', 'loved', 'delivery', 'meena', 'hyderabad', 'late', 'order', 'ajay', 'pune', 'liked', 'service', 'anjali', 'delhi', 'faced', 'ui', 'issues', 'rohit', 'mumbai', 'gave', 'positive', 'feedback']


In [12]:
#Count each word frequency using reduceByKey .
word_counts=filtered_words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
print(word_counts.collect())
#Find top 3 most frequent non-stop words.
top_words=word_counts.takeOrdered(3, key=lambda x: -x[1])
print(top_words)

[('loved', 1), ('liked', 1), ('service', 1), ('anjali', 1), ('faced', 1), ('issues', 1), ('rohit', 1), ('mumbai', 1), ('positive', 1), ('feedback', 1), ('ravi', 1), ('bangalore', 1), ('delivery', 1), ('meena', 1), ('hyderabad', 1), ('late', 1), ('order', 1), ('ajay', 1), ('pune', 1), ('delhi', 1), ('ui', 1), ('gave', 1)]
[('loved', 1), ('liked', 1), ('service', 1)]


#Module 3: DataFrames & Transformation (With Joins)

In [13]:
students = [
    ("Amit", "10-A", 89),
    ("Kavya", "10-B", 92),
    ("Anjali", "10-A", 78),
    ("Rohit", "10-B", 85),
    ("Sneha", "10-C", 80)
]
columns=["name", "section", "marks"]
df_students=spark.createDataFrame(students, columns)
df_students.show()


+------+-------+-----+
|  name|section|marks|
+------+-------+-----+
|  Amit|   10-A|   89|
| Kavya|   10-B|   92|
|Anjali|   10-A|   78|
| Rohit|   10-B|   85|
| Sneha|   10-C|   80|
+------+-------+-----+



In [14]:
attendance = [
    ("Amit", 24),
    ("Kavya", 22),
    ("Anjali", 20),
    ("Rohit", 25),
    ("Sneha", 19)
]

columns2=["name", "days_present"]
df_attendance=spark.createDataFrame(attendance, columns2)
df_attendance.show()


+------+------------+
|  name|days_present|
+------+------------+
|  Amit|          24|
| Kavya|          22|
|Anjali|          20|
| Rohit|          25|
| Sneha|          19|
+------+------------+



In [18]:
#Join both DataFrames on name .
joined_df=df_students.join(df_attendance, on="name", how="inner")
joined_df.show()
#Create a new column: attendance_rate = days_present / 25 .
joined_df=joined_df.withColumn("attendance_rate", joined_df["days_present"] / 25)
joined_df.show()


+------+-------+-----+------------+
|  name|section|marks|days_present|
+------+-------+-----+------------+
|  Amit|   10-A|   89|          24|
|Anjali|   10-A|   78|          20|
| Kavya|   10-B|   92|          22|
| Rohit|   10-B|   85|          25|
| Sneha|   10-C|   80|          19|
+------+-------+-----+------------+

+------+-------+-----+------------+---------------+
|  name|section|marks|days_present|attendance_rate|
+------+-------+-----+------------+---------------+
|  Amit|   10-A|   89|          24|           0.96|
|Anjali|   10-A|   78|          20|            0.8|
| Kavya|   10-B|   92|          22|           0.88|
| Rohit|   10-B|   85|          25|            1.0|
| Sneha|   10-C|   80|          19|           0.76|
+------+-------+-----+------------+---------------+



In [22]:
#Grade students using when :A: >90, B: 80–90, C: <80.
from pyspark.sql.functions import when,col
joined_df=joined_df.withColumn("grade", when(joined_df["marks"] > 90, "A").when((joined_df["marks"] > 80) & (joined_df["marks"] <= 90), "B").otherwise("C"))
joined_df.show()
#Filter students with good grades but poor attendance (<80%).

df_filtered =joined_df.filter(((col("grade") == "A") | (col("grade") == "B")) &(col("attendance_rate") < 0.8))
df_filtered.select("name", "grade", "attendance_rate").show()


+------+-------+-----+------------+---------------+-----+
|  name|section|marks|days_present|attendance_rate|grade|
+------+-------+-----+------------+---------------+-----+
|  Amit|   10-A|   89|          24|           0.96|    B|
|Anjali|   10-A|   78|          20|            0.8|    C|
| Kavya|   10-B|   92|          22|           0.88|    A|
| Rohit|   10-B|   85|          25|            1.0|    B|
| Sneha|   10-C|   80|          19|           0.76|    C|
+------+-------+-----+------------+---------------+-----+

+----+-----+---------------+
|name|grade|attendance_rate|
+----+-----+---------------+
+----+-----+---------------+



#Module 4: Ingest CSV & JSON, Save to Parquet

In [24]:
from google.colab import files
uploaded = files.upload()


Saving employee.csv to employee.csv


In [25]:
from google.colab import files
uploaded=files.upload()


Saving employee_nested.json to employee_nested.json


In [26]:
df_csv=spark.read.option("header", True).option("inferSchema", True).csv("employee.csv")
df_csv.show()


+------+-----+-------+---------+------+
|emp_id| name|   dept|     city|salary|
+------+-----+-------+---------+------+
|   101| Anil|     IT|Bangalore| 80000|
|   102|Kiran|     HR|   Mumbai| 65000|
|   103|Deepa|Finance|  Chennai| 72000|
+------+-----+-------+---------+------+



In [27]:
df_json=spark.read.option("multiline", True).json("employee_nested.json")
df_json.printSchema()
df_json.show(truncate=False)


root
 |-- contact: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)

+------------------------------+---+-------+--------------------+
|contact                       |id |name   |skills              |
+------------------------------+---+-------+--------------------+
|{Hyderabad, nandi@example.com}|201|Nandini|[Python, Spark, SQL]|
+------------------------------+---+-------+--------------------+



In [29]:
#Flatten nested JSON using select , col , alias , explode .
from pyspark.sql.functions import col, explode
df_flat=df_json.select(col("id"),col("name"),col("contact.city").alias("city"),col("contact.email").alias("email"),explode(col("skills")).alias("skill"))
df_flat.show()



+---+-------+---------+-----------------+------+
| id|   name|     city|            email| skill|
+---+-------+---------+-----------------+------+
|201|Nandini|Hyderabad|nandi@example.com|Python|
|201|Nandini|Hyderabad|nandi@example.com| Spark|
|201|Nandini|Hyderabad|nandi@example.com|   SQL|
+---+-------+---------+-----------------+------+



In [30]:
#Save both as Parquet files partitioned by city.
df_csv.write.partitionBy("city").parquet("employee.parquet")
df_flat.write.partitionBy("city").parquet("employee_flat.parquet")

#Module 5: Spark SQL with Temp Views

In [34]:
#Register the students DataFrame as students_view
df_students.createOrReplaceTempView("students_view")

In [41]:
#a) Average marks per section
spark.sql("SELECT section, AVG(marks) AS avg_marks FROM students_view GROUP BY section").show()
#b) Top scorer in each section
spark.sql("SELECT section, MAX(marks) AS max_marks FROM students_view GROUP BY section").show()


+-------+---------+
|section|avg_marks|
+-------+---------+
|   10-A|     83.5|
|   10-B|     88.5|
|   10-C|     80.0|
+-------+---------+

+-------+---------+
|section|max_marks|
+-------+---------+
|   10-A|       89|
|   10-B|       92|
|   10-C|       80|
+-------+---------+



In [44]:
#c) Count of students in each grade category
spark.sql("SELECT CASE WHEN marks > 90 THEN 'A' WHEN marks >= 80 THEN 'B' ELSE 'C' END AS grade, COUNT(*) AS student_count FROM students_view GROUP BY grade").show()

+-----+-------------+
|grade|student_count|
+-----+-------------+
|    B|            3|
|    A|            1|
|    C|            1|
+-----+-------------+



In [46]:
#d) Students with marks above class average
spark.sql("SELECT name, marks FROM students_view WHERE marks > (SELECT AVG(marks) FROM students_view)").show()


+-----+-----+
| name|marks|
+-----+-----+
| Amit|   89|
|Kavya|   92|
|Rohit|   85|
+-----+-----+



In [51]:
df_attendance.createOrReplaceTempView("attendance_view")
#e) Attendance-adjusted performance
spark.sql("""
SELECT s.name, s.section, s.marks, a.days_present,
       ROUND(a.days_present / 25.0, 2) AS attendance_rate,
       CASE
           WHEN s.marks > 90 THEN 'A'
           WHEN s.marks >= 80 THEN 'B'
           ELSE 'C'
       END AS grade
FROM students_view s
JOIN attendance_view a ON s.name = a.name
""").show()


+------+-------+-----+------------+---------------+-----+
|  name|section|marks|days_present|attendance_rate|grade|
+------+-------+-----+------------+---------------+-----+
|  Amit|   10-A|   89|          24|           0.96|    B|
|Anjali|   10-A|   78|          20|           0.80|    C|
| Kavya|   10-B|   92|          22|           0.88|    A|
| Rohit|   10-B|   85|          25|           1.00|    B|
| Sneha|   10-C|   80|          19|           0.76|    B|
+------+-------+-----+------------+---------------+-----+



#Module 6: Partitioned Data & Incremental Loading

In [53]:
df_students.write.partitionBy("section").parquet("output/students/")

In [54]:
incremental=[("Tejas", "10-A", 91)]
df_inc=spark.createDataFrame(incremental, ["name", "section", "marks"])
df_inc.write.mode("append").partitionBy("section").parquet("output/students/")

In [59]:
#List files in output/students/ using Python.
import os
files=os.listdir("output/students/")
print(files)
#Read only partition 10-A and list students0.
df_10A=spark.read.parquet("output/students/section=10-A")
df_10A.show()
#Compare before/after counts for section 10-A .
df_students.filter("section = '10-A'").count()
df_10A.count()



['._SUCCESS.crc', 'section=10-A', 'section=10-B', '_SUCCESS', 'section=10-C']
+------+-----+
|  name|marks|
+------+-----+
|Anjali|   78|
| Tejas|   91|
|  Amit|   89|
+------+-----+



3

#Module 7: ETL Pipeline – End to End

In [60]:
from google.colab import files
uploaded = files.upload()


Saving emp_data.csv to emp_data.csv


In [61]:
df=spark.read.option("header", True).option("inferSchema", True).csv("emp_data.csv")
df.show()


+------+------+-------+------+-----+
|emp_id|  name|   dept|salary|bonus|
+------+------+-------+------+-----+
|     1| Arjun|     IT| 75000| 5000|
|     2| Kavya|     HR| 62000| NULL|
|     3| Sneha|Finance| 68000| 4000|
|     4|Ramesh|  Sales| 58000| NULL|
+------+------+-------+------+-----+



In [62]:
#Fill null bonuses with 2000 .
df=df.fillna({"bonus": 2000})
df.show()
#Create total_ctc = salary + bonus .
df=df.withColumn("total_ctc", df["salary"] + df["bonus"])
df.show()
#Filter employees with total_ctc > 65000 .
df_filtered=df.filter(df["total_ctc"] > 65000)
df_filtered.show()


+------+------+-------+------+-----+
|emp_id|  name|   dept|salary|bonus|
+------+------+-------+------+-----+
|     1| Arjun|     IT| 75000| 5000|
|     2| Kavya|     HR| 62000| 2000|
|     3| Sneha|Finance| 68000| 4000|
|     4|Ramesh|  Sales| 58000| 2000|
+------+------+-------+------+-----+

+------+------+-------+------+-----+---------+
|emp_id|  name|   dept|salary|bonus|total_ctc|
+------+------+-------+------+-----+---------+
|     1| Arjun|     IT| 75000| 5000|    80000|
|     2| Kavya|     HR| 62000| 2000|    64000|
|     3| Sneha|Finance| 68000| 4000|    72000|
|     4|Ramesh|  Sales| 58000| 2000|    60000|
+------+------+-------+------+-----+---------+

+------+-----+-------+------+-----+---------+
|emp_id| name|   dept|salary|bonus|total_ctc|
+------+-----+-------+------+-----+---------+
|     1|Arjun|     IT| 75000| 5000|    80000|
|     3|Sneha|Finance| 68000| 4000|    72000|
+------+-----+-------+------+-----+---------+



In [64]:
#Save result in:JSON format.
df_filtered.write.mode("overwrite").json("/tmp/final_employees_json")
#Parquet format partitioned by department.
df_filtered.write.mode("overwrite").partitionBy("dept").parquet("/tmp/final_employees_parquet")