#1. PySpark Setup & Initialization

# Exercise 1.1 – Setup Spark:
Initialize SparkSession with:

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("BotCampus Intermediate Session") \
.master("local[*]") \
.getOrCreate()

Exercise 1.2 – Load starter data:

In [3]:
data = [("Ananya", "Bangalore", 24),
("Ravi", "Hyderabad", 28),
("Kavya", "Delhi", 22),
("Meena", "Chennai", 25)]
columns = ["name", "city", "age"]
df = spark.createDataFrame(data, columns)
df.show()

+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Ananya|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
+------+---------+---+



#2. RDDs & Transformations
Exercise 2.1 – Create RDD from feedback:

In [4]:
feedback = spark.sparkContext.parallelize([
"Ravi from Bangalore loved the mobile app",
"Meena from Delhi reported poor response time",
"Ajay from Pune liked the delivery speed",
"Ananya from Hyderabad had an issue with UI",
"Rohit from Mumbai gave positive feedback"
])

Tasks:
1. Count total number of words.
2. Find top 3 most common words.
3. Remove stop words ( from , with , the , etc.).
4. Create a dictionary of word → count.

In [5]:
#1
# Split lines into words and flatten the list
words = feedback.flatMap(lambda line: line.lower().split())
total_word_count = words.count()
print("Total number of words:", total_word_count)


Total number of words: 35


In [6]:
#2

# Map each word to (word, 1) and reduce by key
word_counts = words.map(lambda word: (word, 1)) \
                   .reduceByKey(lambda a, b: a + b)

# Get top 3 most frequent words
top_3 = word_counts.takeOrdered(3, key=lambda x: -x[1])
print("Top 3 most common words:", top_3)


Top 3 most common words: [('from', 5), ('the', 2), ('loved', 1)]


In [7]:
#3
# Define simple stop words list
stop_words = {"from", "with", "the", "an", "and", "of", "to", "in"}

# Filter out stop words
filtered_words = words.filter(lambda word: word not in stop_words)

# Recount after stop word removal
filtered_counts = filtered_words.map(lambda word: (word, 1)) \
                                .reduceByKey(lambda a, b: a + b)

# Show filtered word counts
print("Filtered word counts:")
for word, count in filtered_counts.collect():
    print(f"{word}: {count}")


Filtered word counts:
loved: 1
app: 1
poor: 1
response: 1
liked: 1
speed: 1
ananya: 1
issue: 1
rohit: 1
mumbai: 1
positive: 1
feedback: 1
ravi: 1
bangalore: 1
mobile: 1
meena: 1
delhi: 1
reported: 1
time: 1
ajay: 1
pune: 1
delivery: 1
hyderabad: 1
had: 1
ui: 1
gave: 1


In [8]:
#4
# Convert to dictionary
word_count_dict = dict(filtered_counts.collect())
print("Word count dictionary:", word_count_dict)


Word count dictionary: {'loved': 1, 'app': 1, 'poor': 1, 'response': 1, 'liked': 1, 'speed': 1, 'ananya': 1, 'issue': 1, 'rohit': 1, 'mumbai': 1, 'positive': 1, 'feedback': 1, 'ravi': 1, 'bangalore': 1, 'mobile': 1, 'meena': 1, 'delhi': 1, 'reported': 1, 'time': 1, 'ajay': 1, 'pune': 1, 'delivery': 1, 'hyderabad': 1, 'had': 1, 'ui': 1, 'gave': 1}


#3. DataFrames – Transformations
Exercise 3.1 – Create exam_scores DataFrame:

In [9]:
scores = [
("Ravi", "Math", 88),
("Ananya", "Science", 92),
("Kavya", "English", 79),
("Ravi", "English", 67),
("Neha", "Math", 94),

("Meena", "Science", 85)
]
columns = ["name", "subject", "score"]
df_scores = spark.createDataFrame(scores, columns)

Tasks:
1. Add grade column ( >=90 → A, 80-89 → B, 70-79 → C, else D).
2. Group by subject, find average score.
3. Use when and otherwise to classify subject difficulty ( Math/Science =
Difficult).
4. Rank students per subject using Window function.
5. Apply UDF to format names (e.g., make all uppercase).

In [11]:
#1
from pyspark.sql.functions import when, col

df_scores = df_scores.withColumn("grade", when(col("score") >= 90, "A")
                                 .when((col("score") >= 80) & (col("score") < 90), "B")
                                 .when((col("score") >= 70) & (col("score") < 80), "C")
                                 .otherwise("D"))
df_scores.show()

+------+-------+-----+-----+
|  name|subject|score|grade|
+------+-------+-----+-----+
|  Ravi|   Math|   88|    B|
|Ananya|Science|   92|    A|
| Kavya|English|   79|    C|
|  Ravi|English|   67|    D|
|  Neha|   Math|   94|    A|
| Meena|Science|   85|    B|
+------+-------+-----+-----+



In [13]:
#2
avg_scores = df_scores.groupBy("subject").agg(avg("score").alias("avg_score"))
avg_scores.show()


+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
|English|     73.0|
+-------+---------+



In [14]:
#3
df_scores = df_scores.withColumn("difficulty", when(col("subject").isin("Math", "Science"), "Difficult")
                                               .otherwise("Easy"))
df_scores.show()


+------+-------+-----+-----+----------+
|  name|subject|score|grade|difficulty|
+------+-------+-----+-----+----------+
|  Ravi|   Math|   88|    B| Difficult|
|Ananya|Science|   92|    A| Difficult|
| Kavya|English|   79|    C|      Easy|
|  Ravi|English|   67|    D|      Easy|
|  Neha|   Math|   94|    A| Difficult|
| Meena|Science|   85|    B| Difficult|
+------+-------+-----+-----+----------+



In [17]:
#4
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

# Define window partitioned by subject and ordered by descending score
subject_window = Window.partitionBy("subject").orderBy(col("score").desc())

df_scores = df_scores.withColumn("rank", rank().over(subject_window))
df_scores.show()

+------+-------+-----+-----+----------+----+
|  name|subject|score|grade|difficulty|rank|
+------+-------+-----+-----+----------+----+
| Kavya|English|   79|    C|      Easy|   1|
|  Ravi|English|   67|    D|      Easy|   2|
|  Neha|   Math|   94|    A| Difficult|   1|
|  Ravi|   Math|   88|    B| Difficult|   2|
|Ananya|Science|   92|    A| Difficult|   1|
| Meena|Science|   85|    B| Difficult|   2|
+------+-------+-----+-----+----------+----+



In [19]:
#5
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define UDF to convert names to uppercase
def format_name(name):
    return name.upper()

format_name_udf = udf(format_name, StringType())

df_scores = df_scores.withColumn("name_upper", format_name_udf(col("name")))
df_scores.show()

+------+-------+-----+-----+----------+----+----------+
|  name|subject|score|grade|difficulty|rank|name_upper|
+------+-------+-----+-----+----------+----+----------+
| Kavya|English|   79|    C|      Easy|   1|     KAVYA|
|  Ravi|English|   67|    D|      Easy|   2|      RAVI|
|  Neha|   Math|   94|    A| Difficult|   1|      NEHA|
|  Ravi|   Math|   88|    B| Difficult|   2|      RAVI|
|Ananya|Science|   92|    A| Difficult|   1|    ANANYA|
| Meena|Science|   85|    B| Difficult|   2|     MEENA|
+------+-------+-----+-----+----------+----+----------+



Tasks:
1. Load both datasets into PySpark.
2. Print schema and infer nested structure.
3. Flatten the JSON (use explode , select , alias ).
4. Convert both to Parquet and write to /tmp/output .

In [28]:
#1
from google.colab import files
uploaded = files.upload()


Saving employee_nested.json to employee_nested (1).json
Saving students.csv to students (1).csv


In [29]:
#2
df_students = spark.read.csv("/content/students.csv", header=True, inferSchema=True)
df_students.printSchema()
df_students.show()



root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = true)

+---+-----+----------+---------+------+
| id| name|department|     city|salary|
+---+-----+----------+---------+------+
|  1| Amit|        IT|Bangalore| 78000|
|  2|Kavya|        HR|  Chennai| 62000|
|  3|Arjun|   Finance|Hyderabad| 55000|
+---+-----+----------+---------+------+



In [30]:
#2
df_employee = spark.read.json("/content/employee_nested.json", multiLine=True)
df_employee.printSchema()
df_employee.show(truncate=False)


root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pincode: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+---+-----+---------------+
|address         |id |name |skills         |
+----------------+---+-----+---------------+
|{Mumbai, 400001}|101|Sneha|[Python, Spark]|
+----------------+---+-----+---------------+



In [25]:
#3. Flatten the JSON (use explode , select , alias ).

from pyspark.sql.functions import explode, col

# Explode skills
df_flattened = df_employee.select(
    "id",
    "name",
    col("address.city").alias("city"),
    col("address.pincode").alias("pincode"),
    explode("skills").alias("skill")
)

df_flattened.show(truncate=False)


+---+-----+------+-------+------+
|id |name |city  |pincode|skill |
+---+-----+------+-------+------+
|101|Sneha|Mumbai|400001 |Python|
|101|Sneha|Mumbai|400001 |Spark |
+---+-----+------+-------+------+



In [31]:
#4
# Save students CSV as Parquet
df_students.write.mode("overwrite").parquet("/tmp/output/students_parquet")

# Save flattened JSON as Parquet
df_flattened.write.mode("overwrite").parquet("/tmp/output/employees_parquet")


In [35]:
#4
import shutil
import os
from google.colab import files

# Define the directory where the parquet files were saved
output_dir = "/tmp/output"

# Define the names of the parquet directories relative to the output_dir
students_parquet_dir_name = "students_parquet"
employees_parquet_dir_name = "employees_parquet"

# Define the desired names for the zip files (saved in /content for download)
students_zip_name = "/content/students_parquet.zip"
employees_zip_name = "/content/employees_parquet.zip"

# Zip the folders. The third argument is the base directory.
# The first argument is the base name of the archive (without the extension)
shutil.make_archive(students_zip_name.replace(".zip", ""), 'zip', output_dir, students_parquet_dir_name)
shutil.make_archive(employees_zip_name.replace(".zip", ""), 'zip', output_dir, employees_parquet_dir_name)

# Download the zipped parquet folders
files.download(students_zip_name)
files.download(employees_zip_name)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# 5. Spark SQL – Temp Views & Queries
Exercise 5.1 Create view from exam scores and run:

In [36]:
df_scores = spark.createDataFrame(scores, columns)

# Add grade column
df_scores = df_scores.withColumn("grade", when(df_scores.score >= 90, "A")
                                 .when((df_scores.score >= 80) & (df_scores.score < 90), "B")
                                 .when((df_scores.score >= 70) & (df_scores.score < 80), "C")
                                 .otherwise("D"))

In [37]:
df_scores.createOrReplaceTempView("exam_scores")


#Exercise 5.1 – SQL Queries

*  a) Top scorer per subject
*  b) Count of students per grade
*  c) Students with multiple subjects
       sql
       Copy
       Edit
*  d) Subjects with average score above 85  




In [38]:
#a)
spark.sql("""
SELECT subject, name, score
FROM (
  SELECT *, RANK() OVER (PARTITION BY subject ORDER BY score DESC) as rnk
  FROM exam_scores
)
WHERE rnk = 1
""").show()


+-------+------+-----+
|subject|  name|score|
+-------+------+-----+
|English| Kavya|   79|
|   Math|  Neha|   94|
|Science|Ananya|   92|
+-------+------+-----+



In [39]:
#b)
spark.sql("""
SELECT grade, COUNT(*) AS student_count
FROM exam_scores
GROUP BY grade
""").show()


+-----+-------------+
|grade|student_count|
+-----+-------------+
|    B|            2|
|    C|            1|
|    A|            2|
|    D|            1|
+-----+-------------+



In [41]:
#c)
spark.sql("""
SELECT name, COUNT(subject) AS subject_count
FROM exam_scores
GROUP BY name
HAVING COUNT(subject) > 1
""").show()


+----+-------------+
|name|subject_count|
+----+-------------+
|Ravi|            2|
+----+-------------+



In [42]:
#d)
spark.sql("""
SELECT subject, AVG(score) AS avg_score
FROM exam_scores
GROUP BY subject
HAVING AVG(score) > 85
""").show()


+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
+-------+---------+



Join with scores
Calculate attendance-adjusted grade:
If days_present < 20 → downgrade grade by one level

In [44]:
attendance_data = [
    ("Ravi", "Math", 18),
    ("Ananya", "Science", 25),
    ("Kavya", "English", 15),
    ("Ravi", "English", 21),
    ("Neha", "Math", 28),
    ("Meena", "Science", 19)
]
attendance_columns = ["name", "subject", "days_present"]

df_attendance = spark.createDataFrame(attendance_data, attendance_columns)


In [45]:
df_joined = df_scores.join(df_attendance, on=["name", "subject"], how="inner")
df_joined.show()


+------+-------+-----+-----+------------+
|  name|subject|score|grade|days_present|
+------+-------+-----+-----+------------+
|Ananya|Science|   92|    A|          25|
| Kavya|English|   79|    C|          15|
| Meena|Science|   85|    B|          19|
|  Neha|   Math|   94|    A|          28|
|  Ravi|English|   67|    D|          21|
|  Ravi|   Math|   88|    B|          18|
+------+-------+-----+-----+------------+



In [46]:
from pyspark.sql.functions import expr

# Define downgrade logic using when
df_adjusted = df_joined.withColumn("adjusted_grade",
    when(df_joined.days_present >= 20, df_joined.grade)
    .when(df_joined.grade == "A", "B")
    .when(df_joined.grade == "B", "C")
    .when(df_joined.grade == "C", "D")
    .otherwise("D")
)

df_adjusted.select("name", "subject", "score", "grade", "days_present", "adjusted_grade").show()


+------+-------+-----+-----+------------+--------------+
|  name|subject|score|grade|days_present|adjusted_grade|
+------+-------+-----+-----+------------+--------------+
|Ananya|Science|   92|    A|          25|             A|
| Kavya|English|   79|    C|          15|             D|
| Meena|Science|   85|    B|          19|             C|
|  Neha|   Math|   94|    A|          28|             A|
|  Ravi|English|   67|    D|          21|             D|
|  Ravi|   Math|   88|    B|          18|             C|
+------+-------+-----+-----+------------+--------------+



#6. Partitioned Load (Full + Incremental)
Initial Load:

In [47]:
df_scores.write.partitionBy("subject").parquet("/tmp/scores/")

**Incremental Load:**

In [48]:
incremental = [("Meena", "Math", 93)]
df_inc = spark.createDataFrame(incremental, columns)
df_inc.write.mode("append").partitionBy("subject").parquet("/tmp/scores/")

Task:
* List all folders inside /tmp/scores/
* Read only Math partition and display all entries.

In [50]:
import os

folders = os.listdir("/tmp/scores")
print("Partition folders:")
print(folders)


Partition folders:
['._SUCCESS.crc', 'subject=Science', 'subject=English', '_SUCCESS', 'subject=Math']


In [52]:
#Read Full Data from All Partitions
df_all = spark.read.parquet("/tmp/scores/")
df_all.show()

+------+-----+-----+-------+
|  name|score|grade|subject|
+------+-----+-----+-------+
|Ananya|   92|    A|Science|
| Meena|   85|    B|Science|
| Kavya|   79|    C|English|
|  Ravi|   67|    D|English|
|  Neha|   94|    A|   Math|
|  Ravi|   88|    B|   Math|
| Meena|   93| NULL|   Math|
+------+-----+-----+-------+



In [51]:
#Read only Math partition and display all entries.
df_math = spark.read.parquet("/tmp/scores/subject=Math")
df_math.show()


+-----+-----+-----+
| name|score|grade|
+-----+-----+-----+
| Neha|   94|    A|
| Ravi|   88|    B|
|Meena|   93| NULL|
+-----+-----+-----+



#7. ETL: Clean, Transform, Load
Raw CSV:

In [53]:
#1
from google.colab import files
uploaded = files.upload()

Saving employees.csv to employees.csv


Tasks:
1. Load data with header.
2. Fill missing bonus with 2000.
3. Calculate total_ctc = salary + bonus .
4. filter where total_ctc > 60,000.
5. Save final DataFrame to Parquet and JSON.

In [54]:
#1
df_raw = spark.read.csv("/content/employees.csv", header=True, inferSchema=True)
df_raw.show()


+------+-----+-------+------+-----+
|emp_id| name|   dept|salary|bonus|
+------+-----+-------+------+-----+
|     1|Arjun|     IT| 78000| 5000|
|     2|Kavya|     HR| 62000| NULL|
|     3|Sneha|Finance| 55000| 3000|
+------+-----+-------+------+-----+



In [55]:
#2
df_filled = df_raw.fillna({'bonus': 2000})
df_filled.show()


+------+-----+-------+------+-----+
|emp_id| name|   dept|salary|bonus|
+------+-----+-------+------+-----+
|     1|Arjun|     IT| 78000| 5000|
|     2|Kavya|     HR| 62000| 2000|
|     3|Sneha|Finance| 55000| 3000|
+------+-----+-------+------+-----+



In [56]:
#3
from pyspark.sql.functions import col

df_ctc = df_filled.withColumn("total_ctc", col("salary") + col("bonus"))
df_ctc.show()


+------+-----+-------+------+-----+---------+
|emp_id| name|   dept|salary|bonus|total_ctc|
+------+-----+-------+------+-----+---------+
|     1|Arjun|     IT| 78000| 5000|    83000|
|     2|Kavya|     HR| 62000| 2000|    64000|
|     3|Sneha|Finance| 55000| 3000|    58000|
+------+-----+-------+------+-----+---------+



In [57]:
#4
df_filtered = df_ctc.filter(col("total_ctc") > 60000)
df_filtered.show()


+------+-----+----+------+-----+---------+
|emp_id| name|dept|salary|bonus|total_ctc|
+------+-----+----+------+-----+---------+
|     1|Arjun|  IT| 78000| 5000|    83000|
|     2|Kavya|  HR| 62000| 2000|    64000|
+------+-----+----+------+-----+---------+



In [58]:
#5
df_filtered.write.mode("overwrite").parquet("/content/employees_final_parquet")


In [59]:
#5
df_filtered.write.mode("overwrite").json("/content/employees_final_json")
