**1. PySpark Setup & Initialization**

Exercise 1.1 – Setup Spark:

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("BotCampus Intermediate Session") \
.master("local[*]") \
.getOrCreate()

Exercise 1.2 – Load starter data:

In [2]:
data = [("Ananya", "Bangalore", 24),
("Ravi", "Hyderabad", 28),
("Kavya", "Delhi", 22),
("Meena", "Chennai", 25)]
columns = ["name", "city", "age"]
df = spark.createDataFrame(data, columns)
df.show()

+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Ananya|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
+------+---------+---+



**2. RDDs & Transformations**

Exercise 2.1 – Create RDD from feedback:

In [3]:
feedback = spark.sparkContext.parallelize([
"Ravi from Bangalore loved the mobile app",
"Meena from Delhi reported poor response time",
"Ajay from Pune liked the delivery speed",
"Ananya from Hyderabad had an issue with UI",
"Rohit from Mumbai gave positive feedback"
])

**Tasks:**

Count total number of words.

In [4]:
feedback.flatMap(lambda x: x.split()).count()

35

Find top 3 most common words.

In [5]:
feedback.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).takeOrdered(3, key=lambda x: -x[1])

[('from', 5), ('the', 2), ('loved', 1)]

Remove stop words ( from , with , the , etc.).

In [6]:
feedback.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).filter(lambda x: x[0] not in ["from", "with", "the", "had", "an", "a", "of", "and", "in", "to"]).collect()

[('loved', 1),
 ('app', 1),
 ('Delhi', 1),
 ('poor', 1),
 ('response', 1),
 ('Ajay', 1),
 ('Pune', 1),
 ('liked', 1),
 ('speed', 1),
 ('Hyderabad', 1),
 ('issue', 1),
 ('UI', 1),
 ('Rohit', 1),
 ('Mumbai', 1),
 ('positive', 1),
 ('feedback', 1),
 ('Ravi', 1),
 ('Bangalore', 1),
 ('mobile', 1),
 ('Meena', 1),
 ('reported', 1),
 ('time', 1),
 ('delivery', 1),
 ('Ananya', 1),
 ('gave', 1)]

Create a dictionary of word → count.

In [7]:
feedback.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).collectAsMap()

{'from': 5,
 'loved': 1,
 'app': 1,
 'Delhi': 1,
 'poor': 1,
 'response': 1,
 'Ajay': 1,
 'Pune': 1,
 'liked': 1,
 'speed': 1,
 'Hyderabad': 1,
 'an': 1,
 'issue': 1,
 'with': 1,
 'UI': 1,
 'Rohit': 1,
 'Mumbai': 1,
 'positive': 1,
 'feedback': 1,
 'Ravi': 1,
 'Bangalore': 1,
 'the': 2,
 'mobile': 1,
 'Meena': 1,
 'reported': 1,
 'time': 1,
 'delivery': 1,
 'Ananya': 1,
 'had': 1,
 'gave': 1}

**3. DataFrames – Transformations**

Exercise 3.1 – Create exam_scores DataFrame:

In [8]:
scores = [
("Ravi", "Math", 88),
("Ananya", "Science", 92),
("Kavya", "English", 79),
("Ravi", "English", 67),
("Neha", "Math", 94),
("Meena", "Science", 85)
]

columns = ["name", "subject", "score"]
df_scores = spark.createDataFrame(scores, columns)

Tasks:

Add grade column ( >=90 → A, 80-89 → B, 70-79 → C, else D).

In [9]:
import pyspark.sql.functions as F
df_scores=df_scores.withColumn("grade",F.when(df_scores["score"]>=90,"A").when((df_scores["score"]>=80) & (df_scores["score"]<90),"B").when((df_scores["score"]>=70)&(df_scores["score"]<80),"C").otherwise("D"))
df_scores.show()

+------+-------+-----+-----+
|  name|subject|score|grade|
+------+-------+-----+-----+
|  Ravi|   Math|   88|    B|
|Ananya|Science|   92|    A|
| Kavya|English|   79|    C|
|  Ravi|English|   67|    D|
|  Neha|   Math|   94|    A|
| Meena|Science|   85|    B|
+------+-------+-----+-----+



Group by subject, find average score.

In [10]:
df_scores.groupBy("subject").avg("score").alias("avg_score").show()

+-------+----------+
|subject|avg(score)|
+-------+----------+
|Science|      88.5|
|   Math|      91.0|
|English|      73.0|
+-------+----------+



Use when and otherwise to classify subject difficulty ( Math/Science =
Difficult).

In [11]:
df_scores=df_scores.withColumn("difficulty",F.when(df_scores.subject.isin(["Math","Science"]),"Difficult").otherwise("Easy"))
df_scores.show()

+------+-------+-----+-----+----------+
|  name|subject|score|grade|difficulty|
+------+-------+-----+-----+----------+
|  Ravi|   Math|   88|    B| Difficult|
|Ananya|Science|   92|    A| Difficult|
| Kavya|English|   79|    C|      Easy|
|  Ravi|English|   67|    D|      Easy|
|  Neha|   Math|   94|    A| Difficult|
| Meena|Science|   85|    B| Difficult|
+------+-------+-----+-----+----------+



Rank students per subject using Window function.

In [12]:
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("subject").orderBy(F.desc("score"))
df_scores.withColumn("rank",F.rank().over(windowSpec)).show()

+------+-------+-----+-----+----------+----+
|  name|subject|score|grade|difficulty|rank|
+------+-------+-----+-----+----------+----+
| Kavya|English|   79|    C|      Easy|   1|
|  Ravi|English|   67|    D|      Easy|   2|
|  Neha|   Math|   94|    A| Difficult|   1|
|  Ravi|   Math|   88|    B| Difficult|   2|
|Ananya|Science|   92|    A| Difficult|   1|
| Meena|Science|   85|    B| Difficult|   2|
+------+-------+-----+-----+----------+----+



Apply UDF to format names (e.g., make all uppercase).

In [13]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def upper_name(name):
    return name.upper()

upper_udf = udf(upper_name, StringType())
df_scores=df_scores.withColumn("name", upper_udf(df_scores["name"]))
df_scores.show()


+------+-------+-----+-----+----------+
|  name|subject|score|grade|difficulty|
+------+-------+-----+-----+----------+
|  RAVI|   Math|   88|    B| Difficult|
|ANANYA|Science|   92|    A| Difficult|
| KAVYA|English|   79|    C|      Easy|
|  RAVI|English|   67|    D|      Easy|
|  NEHA|   Math|   94|    A| Difficult|
| MEENA|Science|   85|    B| Difficult|
+------+-------+-----+-----+----------+



**4. Ingest CSV & JSON – Save to Parquet**

Dataset 1: CSV file: students.csv

In [14]:
data="""id,name,department,city,salary
1,Amit,IT,Bangalore,78000
2,Kavya,HR,Chennai,62000
3,Arjun,Finance,Hyderabad,55000"""

with open("students.csv", "w") as f:
    f.write(data)

Dataset 2: JSON file employee_nested.json

In [15]:
import json
data=[{
"id": 101,
"name": "Sneha",
"address": {
"city": "Mumbai",
"pincode": 400001
},
"skills": ["Python", "Spark"]
}
]

with open("employee_nested.json", "w") as f:
    json.dump(data, f)



In [16]:
df_csv=spark.read.csv("students.csv",header=True)
df_csv.show()

df_json=spark.read.json("employee_nested.json")
df_json.show()

+---+-----+----------+---------+------+
| id| name|department|     city|salary|
+---+-----+----------+---------+------+
|  1| Amit|        IT|Bangalore| 78000|
|  2|Kavya|        HR|  Chennai| 62000|
|  3|Arjun|   Finance|Hyderabad| 55000|
+---+-----+----------+---------+------+

+----------------+---+-----+---------------+
|         address| id| name|         skills|
+----------------+---+-----+---------------+
|{Mumbai, 400001}|101|Sneha|[Python, Spark]|
+----------------+---+-----+---------------+



Print schema and infer nested structure.

In [17]:
df_csv.printSchema()
df_json.printSchema()
df_json.show(truncate=False)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: string (nullable = true)

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pincode: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+---+-----+---------------+
|address         |id |name |skills         |
+----------------+---+-----+---------------+
|{Mumbai, 400001}|101|Sneha|[Python, Spark]|
+----------------+---+-----+---------------+



Flatten the JSON (use explode , select , alias ).

In [18]:
df_flat=df_json.select(F.col("id"),F.col("name"),F.col('address.city').alias("city"),F.col('address.pincode').alias("pincode"),F.explode(F.col("skills")).alias("skill"))
df_flat.show()

+---+-----+------+-------+------+
| id| name|  city|pincode| skill|
+---+-----+------+-------+------+
|101|Sneha|Mumbai| 400001|Python|
|101|Sneha|Mumbai| 400001| Spark|
+---+-----+------+-------+------+



Convert both to Parquet and write to /tmp/output

In [19]:
df_csv.write.parquet("/tmp/output/students.parquet")
df_flat.write.parquet("/tmp/output/employee_nested.parquet")

**5. Spark SQL – Temp Views & Queries**

Exercise 5.1 Create view from exam scores and run:

In [20]:
df_scores.show()
df_scores.createOrReplaceTempView("exam_scores")

+------+-------+-----+-----+----------+
|  name|subject|score|grade|difficulty|
+------+-------+-----+-----+----------+
|  RAVI|   Math|   88|    B| Difficult|
|ANANYA|Science|   92|    A| Difficult|
| KAVYA|English|   79|    C|      Easy|
|  RAVI|English|   67|    D|      Easy|
|  NEHA|   Math|   94|    A| Difficult|
| MEENA|Science|   85|    B| Difficult|
+------+-------+-----+-----+----------+



a) Top scorer per subject

In [21]:
spark.sql("select name,subject,score from exam_scores where (subject,score) in (select subject,max(score) from exam_scores group by subject)").show()

+------+-------+-----+
|  name|subject|score|
+------+-------+-----+
|ANANYA|Science|   92|
| KAVYA|English|   79|
|  NEHA|   Math|   94|
+------+-------+-----+



b) Count of students per grade

In [22]:
spark.sql("select grade,count(*) as count from exam_scores group by grade").show()

+-----+-----+
|grade|count|
+-----+-----+
|    B|    2|
|    C|    1|
|    A|    2|
|    D|    1|
+-----+-----+



c) Students with multiple subjects

In [23]:
spark.sql("select name,count(subject) as Count from exam_scores group by name having count>1").show()

+----+-----+
|name|Count|
+----+-----+
|RAVI|    2|
+----+-----+



d) Subjects with average score above 85

In [24]:
spark.sql("select subject,avg(score) as avg_score from exam_scores group by subject having avg_score>85").show()

+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
+-------+---------+



Exercise 5.2 Create another DataFrame attendance(name, days_present) and:

In [25]:
attendance = [("RAVI", 25), ("ANANYA", 18), ("KAVYA", 22), ("NEHA", 19), ("MEENA", 20)]
df_attendance = spark.createDataFrame(attendance, ["name", "days_present"])
df_attendance.show()

+------+------------+
|  name|days_present|
+------+------------+
|  RAVI|          25|
|ANANYA|          18|
| KAVYA|          22|
|  NEHA|          19|
| MEENA|          20|
+------+------------+



Join with scores

In [26]:
df_attendance.createOrReplaceTempView("attendance")
df_scores.createOrReplaceTempView("exam_scores")
df_merge=spark.sql("select e.name,e.subject,e.score,e.grade,e.difficulty,a.days_present from exam_scores e join attendance  a on e.name=a.name")
df_merge.show()

+------+-------+-----+-----+----------+------------+
|  name|subject|score|grade|difficulty|days_present|
+------+-------+-----+-----+----------+------------+
|ANANYA|Science|   92|    A| Difficult|          18|
| KAVYA|English|   79|    C|      Easy|          22|
| MEENA|Science|   85|    B| Difficult|          20|
|  NEHA|   Math|   94|    A| Difficult|          19|
|  RAVI|   Math|   88|    B| Difficult|          25|
|  RAVI|English|   67|    D|      Easy|          25|
+------+-------+-----+-----+----------+------------+



Calculate attendance-adjusted grade:

If days_present < 20 → downgrade grade by one level

In [27]:
grade_order = {"A": 1, "B": 2, "C": 3, "D": 4}
reverse_grade_order = {v: k for k, v in grade_order.items()}

def downgrade_grade(grade, days):
    if days is None:
        return grade
    order = grade_order.get(grade, 4)
    if days < 20 and order < 4:
        order += 1
    return reverse_grade_order.get(order, grade)

downgrade_udf = udf(downgrade_grade, StringType())
df_merge=df_merge.withColumn("adjusted_grade",downgrade_udf(df_merge["grade"],df_merge["days_present"]))
df_merge.show()


+------+-------+-----+-----+----------+------------+--------------+
|  name|subject|score|grade|difficulty|days_present|adjusted_grade|
+------+-------+-----+-----+----------+------------+--------------+
|ANANYA|Science|   92|    A| Difficult|          18|             B|
| KAVYA|English|   79|    C|      Easy|          22|             C|
| MEENA|Science|   85|    B| Difficult|          20|             B|
|  NEHA|   Math|   94|    A| Difficult|          19|             B|
|  RAVI|   Math|   88|    B| Difficult|          25|             B|
|  RAVI|English|   67|    D|      Easy|          25|             D|
+------+-------+-----+-----+----------+------------+--------------+



**6. Partitioned Load (Full + Incremental)**

Initial Load:

In [28]:
df_scores.write.partitionBy("subject").parquet("/tmp/scores/")

Incremental Load:

In [29]:
incremental = [("Meena", "Math", 93)]
df_inc = spark.createDataFrame(incremental, columns)
df_inc.write.mode("append").partitionBy("subject").parquet("/tmp/scores/")

List all folders inside /tmp/scores/

In [30]:
import os
print(os.listdir("/tmp/scores/"))

['._SUCCESS.crc', 'subject=Science', 'subject=English', '_SUCCESS', 'subject=Math']


Read only Math partition and display entries:

In [31]:
df_math=spark.read.parquet("/tmp/scores/subject=Math")
df_math.show()

+-----+-----+-----+----------+
| name|score|grade|difficulty|
+-----+-----+-----+----------+
| NEHA|   94|    A| Difficult|
| RAVI|   88|    B| Difficult|
|Meena|   93| NULL|      NULL|
+-----+-----+-----+----------+



7. ETL: Clean, Transform, Load

Raw CSV:

In [32]:
data="""emp_id,name,dept,salary,bonus
1,Arjun,IT,78000,5000
2,Kavya,HR,62000,
3,Sneha,Finance,55000,3000"""

with open("employee.csv", "w") as f:
    f.write(data)

Load data with header.

In [33]:
df_emp=spark.read.csv("employee.csv",header=True)
df_emp.show()

+------+-----+-------+------+-----+
|emp_id| name|   dept|salary|bonus|
+------+-----+-------+------+-----+
|     1|Arjun|     IT| 78000| 5000|
|     2|Kavya|     HR| 62000| NULL|
|     3|Sneha|Finance| 55000| 3000|
+------+-----+-------+------+-----+



Fill missing bonus with 2000.

In [34]:
df_emp=df_emp.fillna({"bonus":2000})
df_emp.show()

+------+-----+-------+------+-----+
|emp_id| name|   dept|salary|bonus|
+------+-----+-------+------+-----+
|     1|Arjun|     IT| 78000| 5000|
|     2|Kavya|     HR| 62000| 2000|
|     3|Sneha|Finance| 55000| 3000|
+------+-----+-------+------+-----+



Calculate total_ctc = salary + bonus .

In [35]:
df_emp=df_emp.withColumn("Total_ctc",F.col("salary")+F.col("bonus"))
df_emp.show()

+------+-----+-------+------+-----+---------+
|emp_id| name|   dept|salary|bonus|Total_ctc|
+------+-----+-------+------+-----+---------+
|     1|Arjun|     IT| 78000| 5000|  83000.0|
|     2|Kavya|     HR| 62000| 2000|  64000.0|
|     3|Sneha|Finance| 55000| 3000|  58000.0|
+------+-----+-------+------+-----+---------+



Filter where total_ctc > 60,000.

In [36]:
df_emp.filter(F.col("Total_ctc")>60000).show()

+------+-----+----+------+-----+---------+
|emp_id| name|dept|salary|bonus|Total_ctc|
+------+-----+----+------+-----+---------+
|     1|Arjun|  IT| 78000| 5000|  83000.0|
|     2|Kavya|  HR| 62000| 2000|  64000.0|
+------+-----+----+------+-----+---------+



Save final DataFrame to Parquet and JSON.

In [37]:
df.write.parquet("/tmp/employee.parquet")
df_emp.toJSON("Emp.json")


MapPartitionsRDD[219] at toJavaRDD at NativeMethodAccessorImpl.java:0