# Module 1: Setup & SparkSession Initialization

Tasks:

    Install and configure PySpark in your local system or Colab.

    Initialize Spark

    Create a DataFrame

    Show schema, explain data types, and convert to RDD.

    Print .collect() and df.rdd.map() output.


In [1]:
# Install and configure PySpark in your local system or Colab.
from pyspark.sql import SparkSession

# Initialize Spark with:
spark = SparkSession.builder \
  .appName("BotCampus PySpark Practice") \
  .master("local[*]") \
  .getOrCreate()

In [2]:
# Create a DataFrame from:
data = [
  ("Anjali", "Bangalore", 24),
  ("Ravi", "Hyderabad", 28),
  ("Kavya", "Delhi", 22),
  ("Meena", "Chennai", 25),
  ("Arjun", "Mumbai", 30)
]
columns = ["name", "city", "age"]

df = spark.createDataFrame(data, columns)

In [3]:
# Show schema, explain data types, and convert to RDD.
df.printSchema()
df.show()

# Convert to RDD
rdd = df.rdd

root
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- age: long (nullable = true)

+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Anjali|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
| Arjun|   Mumbai| 30|
+------+---------+---+



In [4]:
#  Print .collect() and df.rdd.map() output.
print(rdd.collect())
print(rdd.map(lambda x: (x.name.upper(), x.age)).collect())

[Row(name='Anjali', city='Bangalore', age=24), Row(name='Ravi', city='Hyderabad', age=28), Row(name='Kavya', city='Delhi', age=22), Row(name='Meena', city='Chennai', age=25), Row(name='Arjun', city='Mumbai', age=30)]
[('ANJALI', 24), ('RAVI', 28), ('KAVYA', 22), ('MEENA', 25), ('ARJUN', 30)]


# Module 2: RDDs & Transformations

Scenario: You received app feedback from users in free-text.

Tasks:

    Split each line into words ( flatMap ).

    Remove stop words ( from , the , etc.).

    Count each word frequency using reduceByKey .

    Find top 3 most frequent non-stop words.

In [5]:
# Scenario: You received app feedback from users in free-text.
feedback = spark.sparkContext.parallelize([
  "Ravi from Bangalore loved the delivery",
  "Meena from Hyderabad had a late order",
  "Ajay from Pune liked the service",
  "Anjali from Delhi faced UI issues",
  "Rohit from Mumbai gave positive feedback"
])

stop_words = {"from", "the", "a", "an", "with", "had"}

In [7]:
word_counts = (
    feedback

    # Split each line into words ( flatMap ).
    .flatMap(lambda line: line.lower().split())

    # Remove stop words ( from , the , etc.).
    .filter(lambda word: word not in stop_words)

    # Count each word frequency using reduceByKey .
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
)

In [8]:
# Find top 3 most frequent non-stop words.
top3 = word_counts.takeOrdered(3, key=lambda x: -x[1])
print(top3)

[('loved', 1), ('liked', 1), ('service', 1)]


# Module 3: DataFrames & Transformation (With Joins)

Tasks:

    Join both DataFrames on name .

    Create a new column: attendance_rate = days_present / 25 .

    Grade students using when :A: >90, B: 80–90, C: <80.

    Filter students with good grades but poor attendance (<80%).

In [9]:
# DataFrames:
students = [
  ("Amit", "10-A", 89),
  ("Kavya", "10-B", 92),
  ("Anjali", "10-A", 78),
  ("Rohit", "10-B", 85),
  ("Sneha", "10-C", 80)
]
columns = ["name", "section", "marks"]

attendance = [
  ("Amit", 24),
  ("Kavya", 22),
  ("Anjali", 20),
  ("Rohit", 25),
  ("Sneha", 19)
]
columns2 = ["name", "days_present"]

In [19]:
# Join both DataFrames on name .
df_students = spark.createDataFrame(students, columns)
df_attendance = spark.createDataFrame(attendance, columns2)

In [20]:
# Create a new column: attendance_rate = days_present / 25 .
from pyspark.sql.functions import col, when

df_joined = df_students.join(df_attendance, "name")
df_joined = df_joined.withColumn("attendance_rate", col("days_present") / 25)

df_joined.show()

+------+-------+-----+------------+---------------+
|  name|section|marks|days_present|attendance_rate|
+------+-------+-----+------------+---------------+
|  Amit|   10-A|   89|          24|           0.96|
|Anjali|   10-A|   78|          20|            0.8|
| Kavya|   10-B|   92|          22|           0.88|
| Rohit|   10-B|   85|          25|            1.0|
| Sneha|   10-C|   80|          19|           0.76|
+------+-------+-----+------------+---------------+



In [21]:
# Grade students using when :A: >90, B: 80–90, C: <80.
df_joined = df_joined.withColumn("grade",
    when(col("marks") > 90, "A")
    .when((col("marks") > 80) & (col("marks") <= 90), "B")
    .when((col("marks") >= 70) & (col("marks") <= 80), "C")
    .otherwise("D")
)
df_joined.show()

+------+-------+-----+------------+---------------+-----+
|  name|section|marks|days_present|attendance_rate|grade|
+------+-------+-----+------------+---------------+-----+
|  Amit|   10-A|   89|          24|           0.96|    B|
|Anjali|   10-A|   78|          20|            0.8|    C|
| Kavya|   10-B|   92|          22|           0.88|    A|
| Rohit|   10-B|   85|          25|            1.0|    B|
| Sneha|   10-C|   80|          19|           0.76|    C|
+------+-------+-----+------------+---------------+-----+



In [22]:
# Filter students with good grades but poor attendance (<80%).
df_filtered = df_joined.filter((col("grade").isin("A", "B")) & (col("attendance_rate") < 0.8))
df_filtered.show()

+----+-------+-----+------------+---------------+-----+
|name|section|marks|days_present|attendance_rate|grade|
+----+-------+-----+------------+---------------+-----+
+----+-------+-----+------------+---------------+-----+



# Module 4: Ingest CSV & JSON, Save to Parquet

Tasks:

    1. Ingest CSV:
    2. Ingest JSON:

    Read both formats into DataFrames.

    Flatten nested JSON using select , col , alias , explode .

    Save both as Parquet files partitioned by city.

In [28]:
# 1. Ingest CSV:
csv_data = [("101","Anil","IT","Bangalore",80000),
            ("102","Kiran","HR","Mumbai",65000),
            ("103","Deepa","Finance","Chennai",72000)]
columns = ["emp_id", "name", "dept", "city", "salary"]
df_csv = spark.createDataFrame(csv_data, columns)
df_csv.show()

+------+-----+-------+---------+------+
|emp_id| name|   dept|     city|salary|
+------+-----+-------+---------+------+
|   101| Anil|     IT|Bangalore| 80000|
|   102|Kiran|     HR|   Mumbai| 65000|
|   103|Deepa|Finance|  Chennai| 72000|
+------+-----+-------+---------+------+



In [29]:
# 2. Ingest JSON:
json_data = [{
    "id": 201,
    "name": "Nandini",
    "contact": {
        "email": "nandi@example.com",
        "city": "Hyderabad"
    },
    "skills": ["Python", "Spark", "SQL"]
}]

In [30]:
# Read both formats into DataFrames.
import pandas as pd
import json
df_json = spark.read.json(spark.sparkContext.parallelize([json.dumps(json_data[0])]))


# Flatten nested JSON using select , col , alias , explode .
from pyspark.sql.functions import col, explode

flattened = df_json.select(
    "id",
    "name",
    col("contact.email").alias("email"),
    col("contact.city").alias("city"),
    explode("skills").alias("skill")
)
flattened.show()

+---+-------+-----------------+---------+------+
| id|   name|            email|     city| skill|
+---+-------+-----------------+---------+------+
|201|Nandini|nandi@example.com|Hyderabad|Python|
|201|Nandini|nandi@example.com|Hyderabad| Spark|
|201|Nandini|nandi@example.com|Hyderabad|   SQL|
+---+-------+-----------------+---------+------+



In [31]:
# Save as Parquet
df_csv.write.mode("overwrite").partitionBy("city").parquet("/tmp/output/employees_csv")
flattened.write.mode("overwrite").partitionBy("city").parquet("/tmp/output/employees_json")

# Module 5: Spark SQL with Temp Views

Tasks:

    Register the students DataFrame as students_view .

    Write and run the following queries:
    
    a) Average marks per section
    b) Top scorer in each section
    c) Count of students in each grade category
    d) Students with marks above class average
    e) Attendance-adjusted performance

In [40]:
# Register the students DataFrame as students_view .
df_joined.createOrReplaceTempView("students_view")

In [41]:
# a) Average marks per section
spark.sql("""
    SELECT section, ROUND(AVG(marks), 2) AS avg_marks
    FROM students_view
    GROUP BY section
""").show()

+-------+---------+
|section|avg_marks|
+-------+---------+
|   10-C|     80.0|
|   10-A|     83.5|
|   10-B|     88.5|
+-------+---------+



In [42]:
# b) Top scorer in each section
spark.sql("""
    SELECT section, name, marks
    FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY section ORDER BY marks DESC) as rank
        FROM students_view
    ) ranked
    WHERE rank = 1
""").show()

+-------+-----+-----+
|section| name|marks|
+-------+-----+-----+
|   10-A| Amit|   89|
|   10-B|Kavya|   92|
|   10-C|Sneha|   80|
+-------+-----+-----+



In [43]:
# c) Count of students in each grade category
spark.sql("""
    SELECT grade, COUNT(*) AS student_count
    FROM students_view
    GROUP BY grade
""").show()

+-----+-------------+
|grade|student_count|
+-----+-------------+
|    B|            2|
|    C|            2|
|    A|            1|
+-----+-------------+



In [44]:
# d) Students with marks above class average
spark.sql("""
    SELECT *
    FROM students_view
    WHERE marks > (SELECT AVG(marks) FROM students_view)
""").show()

+-----+-------+-----+------------+---------------+-----+
| name|section|marks|days_present|attendance_rate|grade|
+-----+-------+-----+------------+---------------+-----+
| Amit|   10-A|   89|          24|           0.96|    B|
|Kavya|   10-B|   92|          22|           0.88|    A|
|Rohit|   10-B|   85|          25|            1.0|    B|
+-----+-------+-----+------------+---------------+-----+



In [45]:
# e) Attendance-adjusted performance
# (Example: adjusted_score = 0.7 * marks + 0.3 * attendance)
spark.sql("""
    SELECT name, section, marks, days_present,
           ROUND(0.7 * marks + 0.3 * days_present, 2) AS adjusted_score
    FROM students_view
""").show()

+------+-------+-----+------------+--------------+
|  name|section|marks|days_present|adjusted_score|
+------+-------+-----+------------+--------------+
|  Amit|   10-A|   89|          24|          69.5|
|Anjali|   10-A|   78|          20|          60.6|
| Kavya|   10-B|   92|          22|          71.0|
| Rohit|   10-B|   85|          25|          67.0|
| Sneha|   10-C|   80|          19|          61.7|
+------+-------+-----+------------+--------------+



# Module 6: Partitioned Data & Incremental Loading

Step 1: Full Load

Step 2: Incremental Load

Tasks:

    List files in output/students/ using Python.

    Read only partition 10-A and list students.

    Compare before/after counts for section 10-A .

In [60]:
# Step 1: Full Load
df_students.write.partitionBy("section").mode("overwrite").parquet("output/students/")
df_students.show()

+------+-------+-----+
|  name|section|marks|
+------+-------+-----+
|  Amit|   10-A|   89|
| Kavya|   10-B|   92|
|Anjali|   10-A|   78|
| Rohit|   10-B|   85|
| Sneha|   10-C|   80|
+------+-------+-----+



In [61]:
# Step 2: Incremental Load
incremental = [("Tejas", "10-A", 91)]
df_inc = spark.createDataFrame(incremental, ["name", "section", "marks"])
df_inc.write.mode("append").partitionBy("section").parquet("output/students/")
df_inc.show()

+-----+-------+-----+
| name|section|marks|
+-----+-------+-----+
|Tejas|   10-A|   91|
+-----+-------+-----+



In [62]:
# List files in output/students/ using Python.
import os

for root, dirs, files in os.walk("output/students/"):
    for f in files:
        print(os.path.join(root, f))


output/students/._SUCCESS.crc
output/students/_SUCCESS
output/students/section=10-A/.part-00001-8a1466c1-9339-42ed-beb5-c76c661a1032.c000.snappy.parquet.crc
output/students/section=10-A/part-00001-8a1466c1-9339-42ed-beb5-c76c661a1032.c000.snappy.parquet
output/students/section=10-A/part-00001-97d225d0-6390-425e-bfcc-6c2ca04fd5af.c000.snappy.parquet
output/students/section=10-A/.part-00001-97d225d0-6390-425e-bfcc-6c2ca04fd5af.c000.snappy.parquet.crc
output/students/section=10-A/part-00000-97d225d0-6390-425e-bfcc-6c2ca04fd5af.c000.snappy.parquet
output/students/section=10-A/.part-00000-97d225d0-6390-425e-bfcc-6c2ca04fd5af.c000.snappy.parquet.crc
output/students/section=10-B/part-00001-97d225d0-6390-425e-bfcc-6c2ca04fd5af.c000.snappy.parquet
output/students/section=10-B/.part-00001-97d225d0-6390-425e-bfcc-6c2ca04fd5af.c000.snappy.parquet.crc
output/students/section=10-B/part-00000-97d225d0-6390-425e-bfcc-6c2ca04fd5af.c000.snappy.parquet
output/students/section=10-B/.part-00000-97d225d0-63

In [63]:
# Read only partition 10-A and list students.
df_10a = spark.read.parquet("output/students/section=10-A")
df_10a.show()

+------+-----+
|  name|marks|
+------+-----+
|Anjali|   78|
| Tejas|   91|
|  Amit|   89|
+------+-----+



In [64]:
# Compare before/after counts for section 10-A .

# Before adding incremental record
df_before = spark.read.parquet("output/students/section=10-A")
count_before = df_before.count()

# After incremental load
df_after = spark.read.parquet("output/students/")
count_after = df_after.filter(col("section") == "10-A").count()

print(f"Before count: {count_before}, After count: {count_after}")


Before count: 3, After count: 3


In [None]:
# Module 7: ETL Pipeline – End to End

Tasks:

    Load CSV with inferred schema.

    Fill null bonuses with 2000 .

    Create total_ctc = salary + bonus .

    Filter employees with total_ctc > 65000 .

    Save result in:
      JSON format.
      Parquet format partitioned by department.

In [65]:
from google.colab import files
uploaded = files.upload()

Saving employee_raw.csv to employee_raw.csv


In [67]:
# Load CSV with inferred schema.
df_employees = spark.read.csv("employee_raw.csv", header=True, inferSchema=True)
df_employees.show()

+------+------+-------+------+-----+
|emp_id|  name|   dept|salary|bonus|
+------+------+-------+------+-----+
|     1| Arjun|     IT| 75000| 5000|
|     2| Kavya|     HR| 62000| NULL|
|     3| Sneha|Finance| 68000| 4000|
|     4|Ramesh|  Sales| 58000| NULL|
+------+------+-------+------+-----+



In [71]:
# Fill null bonuses with 2000 .
from pyspark.sql.functions import coalesce

df_filled = df.fillna({'bonus': 2000})
df_filled.show()

+------+------+-------+------+-----+
|emp_id|  name|   dept|salary|bonus|
+------+------+-------+------+-----+
|     1| Arjun|     IT| 75000| 5000|
|     2| Kavya|     HR| 62000| 2000|
|     3| Sneha|Finance| 68000| 4000|
|     4|Ramesh|  Sales| 58000| 2000|
+------+------+-------+------+-----+



In [72]:
# Create total_ctc = salary + bonus .
from pyspark.sql.functions import col

df_total_ctc = df_filled.withColumn("total_ctc", col("salary") + col("bonus"))
df_total_ctc.show()

+------+------+-------+------+-----+---------+
|emp_id|  name|   dept|salary|bonus|total_ctc|
+------+------+-------+------+-----+---------+
|     1| Arjun|     IT| 75000| 5000|    80000|
|     2| Kavya|     HR| 62000| 2000|    64000|
|     3| Sneha|Finance| 68000| 4000|    72000|
|     4|Ramesh|  Sales| 58000| 2000|    60000|
+------+------+-------+------+-----+---------+



In [73]:
# Filter employees with total_ctc > 65000.
filtered_df = df_total_ctc.filter(col("total_ctc") > 65000)
filtered_df.show()

+------+-----+-------+------+-----+---------+
|emp_id| name|   dept|salary|bonus|total_ctc|
+------+-----+-------+------+-----+---------+
|     1|Arjun|     IT| 75000| 5000|    80000|
|     3|Sneha|Finance| 68000| 4000|    72000|
+------+-----+-------+------+-----+---------+



In [74]:
# Save result in:
#   JSON format.
filtered_df.write.mode("overwrite").json("/tmp/clean_employees/json")

#   Parquet format partitioned by department.
filtered_df.write.mode("overwrite").partitionBy("dept").parquet("/tmp/clean_employees/parquet")
# Read Parquet directory into a DataFrame
parquet_df = spark.read.parquet("/tmp/clean_employees/parquet")
parquet_df.show()

+------+-----+------+-----+---------+-------+
|emp_id| name|salary|bonus|total_ctc|   dept|
+------+-----+------+-----+---------+-------+
|     3|Sneha| 68000| 4000|    72000|Finance|
|     1|Arjun| 75000| 5000|    80000|     IT|
+------+-----+------+-----+---------+-------+



In [75]:
# Download the JSON file
# Convert Spark DataFrame to Pandas
pandas_df = filtered_df.toPandas()

# Save as a single JSON file
json_path = "/tmp/clean_employees.json"
pandas_df.to_json(json_path, orient="records", lines=True)

from google.colab import files
files.download(json_path)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>