#**1. PySpark Setup & Initialization**

**Exercise 1.1 – Setup Spark:**

Initialize SparkSession with:

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("BotCampus Intermediate Session") \
.master("local[*]") \
.getOrCreate()

sc = spark.sparkContext

**Exercise 1.2 – Load starter data:**

In [2]:
data = [("Ananya", "Bangalore", 24),
("Ravi", "Hyderabad", 28),
("Kavya", "Delhi", 22),
("Meena", "Chennai", 25)]
columns = ["name", "city", "age"]

df = spark.createDataFrame(data, columns)
df.show()

+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Ananya|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
+------+---------+---+



# **2. RDDs & Transformations**

**Exercise 2.1 – Create RDD from feedback:**

In [4]:
feedback = sc.parallelize([
"Ravi from Bangalore loved the mobile app",
"Meena from Delhi reported poor response time",
"Ajay from Pune liked the delivery speed",
"Ananya from Hyderabad had an issue with UI",
"Rohit from Mumbai gave positive feedback"
])

**Tasks:**

Count total number of words.

In [6]:
word = feedback.flatMap(lambda x: x.split(" "))
word_count = word.count()
print("The number of words are: ", word_count)

The number of words are:  35


Find top 3 most common words.

In [9]:
from collections import Counter

common_words = word.map(lambda x: x.lower()).countByValue()
common_words = dict(Counter(common_words).most_common(3))
print("The top 3 most common words are: ", common_words)

The top 3 most common words are:  {'from': 5, 'the': 2, 'ravi': 1}


Remove stop words ( from , with , the , etc.).

In [10]:
value = {"from", "with", "the", "had", "an", "and"}

fil_word = word.filter(lambda x: x.lower() not in value)
count = fil_word.count()
print("The number of words are: ", count)

The number of words are:  25


Create a dictionary of word → count.

In [12]:
word_dict = dict(word.map(lambda x: (x.lower(), 1)).reduceByKey(lambda a, b: a + b).collect())
print("Word dictionary:", word_dict)

Word dictionary: {'from': 5, 'loved': 1, 'app': 1, 'poor': 1, 'response': 1, 'liked': 1, 'speed': 1, 'ananya': 1, 'an': 1, 'issue': 1, 'with': 1, 'rohit': 1, 'mumbai': 1, 'positive': 1, 'feedback': 1, 'ravi': 1, 'bangalore': 1, 'the': 2, 'mobile': 1, 'meena': 1, 'delhi': 1, 'reported': 1, 'time': 1, 'ajay': 1, 'pune': 1, 'delivery': 1, 'hyderabad': 1, 'had': 1, 'ui': 1, 'gave': 1}


# **3. DataFrames – Transformations**

**Exercise 3.1 – Create exam_scores DataFrame:**

In [55]:
scores = [
("Ravi", "Math", 88),
("Ananya", "Science", 92),
("Kavya", "English", 79),
("Ravi", "English", 67),
("Neha", "Math", 94),
("Meena", "Science", 85)
]
columns = ["name", "subject", "score"]

df_scores = spark.createDataFrame(scores, columns)

df_scores.show()

+------+-------+-----+
|  name|subject|score|
+------+-------+-----+
|  Ravi|   Math|   88|
|Ananya|Science|   92|
| Kavya|English|   79|
|  Ravi|English|   67|
|  Neha|   Math|   94|
| Meena|Science|   85|
+------+-------+-----+



Add grade column ( >=90 → A, 80-89 → B, 70-79 → C, else D).

In [56]:
from pyspark.sql.functions import when, col

df_scores = df_scores.withColumn(
    "grade",
    when(col("score") >= 90, "A")
    .when(col("score") >= 80, "B")
    .when(col("score") >= 70, "C")
    .otherwise("D")
)

df_scores.show()

+------+-------+-----+-----+
|  name|subject|score|grade|
+------+-------+-----+-----+
|  Ravi|   Math|   88|    B|
|Ananya|Science|   92|    A|
| Kavya|English|   79|    C|
|  Ravi|English|   67|    D|
|  Neha|   Math|   94|    A|
| Meena|Science|   85|    B|
+------+-------+-----+-----+



Group by subject, find average score.

In [54]:
from pyspark.sql import functions as F

grouped = df_scores.groupBy("subject").agg(F.avg("score").alias("avg_score"))
grouped.show()

+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
|English|     73.0|
+-------+---------+



Use when and otherwise to classify subject difficulty ( Math/Science Difficult).

In [57]:
df_scores = df_scores.withColumn(
    "difficulty",
    when(col("subject").isin("Math", "Science"), "Difficult").otherwise("Easy")
)

df_scores.show()

+------+-------+-----+-----+----------+
|  name|subject|score|grade|difficulty|
+------+-------+-----+-----+----------+
|  Ravi|   Math|   88|    B| Difficult|
|Ananya|Science|   92|    A| Difficult|
| Kavya|English|   79|    C|      Easy|
|  Ravi|English|   67|    D|      Easy|
|  Neha|   Math|   94|    A| Difficult|
| Meena|Science|   85|    B| Difficult|
+------+-------+-----+-----+----------+



Rank students per subject using Window function.

In [58]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("subject").orderBy(col("score").desc())
df_scores = df_scores.withColumn("rank", F.rank().over(windowSpec))

df_scores.show()

+------+-------+-----+-----+----------+----+
|  name|subject|score|grade|difficulty|rank|
+------+-------+-----+-----+----------+----+
| Kavya|English|   79|    C|      Easy|   1|
|  Ravi|English|   67|    D|      Easy|   2|
|  Neha|   Math|   94|    A| Difficult|   1|
|  Ravi|   Math|   88|    B| Difficult|   2|
|Ananya|Science|   92|    A| Difficult|   1|
| Meena|Science|   85|    B| Difficult|   2|
+------+-------+-----+-----+----------+----+



Apply UDF to format names (e.g., make all uppercase).

In [59]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(returnType=StringType())
def format_name(name):
    return name.upper()

df_scores = df_scores.withColumn("formatted", format_name("name"))

df_scores.show()

+------+-------+-----+-----+----------+----+---------+
|  name|subject|score|grade|difficulty|rank|formatted|
+------+-------+-----+-----+----------+----+---------+
| Kavya|English|   79|    C|      Easy|   1|    KAVYA|
|  Ravi|English|   67|    D|      Easy|   2|     RAVI|
|  Neha|   Math|   94|    A| Difficult|   1|     NEHA|
|  Ravi|   Math|   88|    B| Difficult|   2|     RAVI|
|Ananya|Science|   92|    A| Difficult|   1|   ANANYA|
| Meena|Science|   85|    B| Difficult|   2|    MEENA|
+------+-------+-----+-----+----------+----+---------+



# **4. Ingest CSV & JSON – Save to Parquet**

**Dataset 1: CSV file:**

**Tasks:**

Load both datasets into PySpark.

In [38]:
data = """id,name,department,city,salary
1,Amit,IT,Bangalore,78000
2,Kavya,HR,Chennai,62000
3,Arjun,Finance,Hyderabad,55000
"""

with open("students.csv", "w") as f:
    f.write(data)

df_csv = spark.read.csv("students.csv", header=True, inferSchema=True)

**Dataset 2: JSON file employee_nested.json**

In [39]:
import json

json_data = [
{
"id": 101,
"name": "Sneha",
"address": {
"city": "Mumbai",
"pincode": 400001
},
"skills": ["Python", "Spark"]
}
]

with open("employee_nested.json", "w") as f:
    f.write(str(json_data))

df_json = spark.read.json("employee_nested.json")

Print schema and infer nested structure.

In [40]:
df_csv.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = true)



In [41]:
df_json.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pincode: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)



Flatten the JSON (use explode , select , alias ).

In [43]:
from pyspark.sql.functions import explode
df_flat = df_json.select(
    "id",
    "name",
    "address.city",
    "address.pincode",
    explode("skills").alias("skill")
)

df_flat.show()

+---+-----+------+-------+------+
| id| name|  city|pincode| skill|
+---+-----+------+-------+------+
|101|Sneha|Mumbai| 400001|Python|
|101|Sneha|Mumbai| 400001| Spark|
+---+-----+------+-------+------+



Convert both to Parquet and write to /tmp/output .

In [44]:
df_csv.write.mode("overwrite").parquet("/tmp/output/students_parquet")
df_flat.write.mode("overwrite").parquet("/tmp/output/employees_parquet")

#**5. Spark SQL – Temp Views & Queries**

**Exercise 5.1 Create view from exam scores and run:**

In [61]:
df_scores.createOrReplaceTempView("exam_scores")

a) Top scorer per subject

In [64]:
spark.sql("SELECT subject, name, score FROM ( SELECT *, RANK() OVER (PARTITION BY subject ORDER BY score DESC) as rnk FROM exam_scores) WHERE rnk = 1").show()

+-------+------+-----+
|subject|  name|score|
+-------+------+-----+
|English| Kavya|   79|
|   Math|  Neha|   94|
|Science|Ananya|   92|
+-------+------+-----+



b) Count of students per grade

In [63]:
spark.sql("SELECT grade, COUNT(*) as student_count FROM exam_scores GROUP BY grade").show()

+-----+-------------+
|grade|student_count|
+-----+-------------+
|    B|            2|
|    C|            1|
|    A|            2|
|    D|            1|
+-----+-------------+



c) Students with multiple subjects

In [65]:
spark.sql("SELECT name, COUNT(*) as subjects FROM exam_scores GROUP BY name HAVING subjects > 1").show()

+----+--------+
|name|subjects|
+----+--------+
|Ravi|       2|
+----+--------+



d) Subjects with average score above 85

In [66]:
spark.sql("SELECT subject, AVG(score) as avg_score FROM exam_scores GROUP BY subject HAVING avg_score > 85").show()

+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
+-------+---------+



Exercise 5.2 Create another DataFrame attendance(name, days_present) and Join with scores

In [68]:
attendance = [("Ravi", 22), ("Ananya", 18), ("Kavya", 25), ("Neha", 21), ("Meena", 15)]
df_attendance = spark.createDataFrame(attendance, ["name", "days_present"])

df_joined = df_scores.join(df_attendance, on="name", how="left")
df_joined.show()

+------+-------+-----+-----+----------+----+---------+------------+
|  name|subject|score|grade|difficulty|rank|formatted|days_present|
+------+-------+-----+-----+----------+----+---------+------------+
|Ananya|Science|   92|    A| Difficult|   1|   ANANYA|          18|
|  Ravi|English|   67|    D|      Easy|   2|     RAVI|          22|
|  Ravi|   Math|   88|    B| Difficult|   2|     RAVI|          22|
|  Neha|   Math|   94|    A| Difficult|   1|     NEHA|          21|
| Meena|Science|   85|    B| Difficult|   2|    MEENA|          15|
| Kavya|English|   79|    C|      Easy|   1|    KAVYA|          25|
+------+-------+-----+-----+----------+----+---------+------------+



Calculate attendance-adjusted grade:
If days_present < 20 → downgrade grade by one level

In [69]:
df_adjusted = df_joined.withColumn(
    "final_grade",
    when(col("days_present") < 20,
         when(col("grade") == "A", "B")
        .when(col("grade") == "B", "C")
        .when(col("grade") == "C", "D")
        .otherwise("D")
    ).otherwise(col("grade"))
)

df_adjusted.select("name", "subject", "score", "grade", "days_present", "final_grade").show()

+------+-------+-----+-----+------------+-----------+
|  name|subject|score|grade|days_present|final_grade|
+------+-------+-----+-----+------------+-----------+
|Ananya|Science|   92|    A|          18|          B|
|  Ravi|   Math|   88|    B|          22|          B|
| Kavya|English|   79|    C|          25|          C|
|  Ravi|English|   67|    D|          22|          D|
|  Neha|   Math|   94|    A|          21|          A|
| Meena|Science|   85|    B|          15|          C|
+------+-------+-----+-----+------------+-----------+



# **6. Partitioned Load (Full + Incremental)**

**Initial Load:**

In [70]:
df_scores.write.partitionBy("subject").parquet("/tmp/scores/")

**Incremental Load:**

In [71]:
incremental = [("Meena", "Math", 93)]
df_inc = spark.createDataFrame(incremental, columns)
df_inc.write.mode("append").partitionBy("subject").parquet("/tmp/scores/")

**Task:**

List all folders inside /tmp/scores/

In [72]:
import os
print("Partitions:", os.listdir("/tmp/scores/"))

Partitions: ['._SUCCESS.crc', 'subject=Science', 'subject=English', '_SUCCESS', 'subject=Math']


Read only Math partition and display all entries.

In [73]:
df_math = spark.read.parquet("/tmp/scores/subject=Math")
df_math.show()

+-----+-----+-----+----------+----+---------+
| name|score|grade|difficulty|rank|formatted|
+-----+-----+-----+----------+----+---------+
| Neha|   94|    A| Difficult|   1|     NEHA|
| Ravi|   88|    B| Difficult|   2|     RAVI|
|Meena|   93| NULL|      NULL|NULL|     NULL|
+-----+-----+-----+----------+----+---------+



# **7. ETL: Clean, Transform, Load**

**Raw CSV:**

In [76]:
csv_data = """emp_id,name,dept,salary,bonus
1,Arjun,IT,78000,5000
2,Kavya,HR,62000,
3,Sneha,Finance,55000,3000
"""

with open("raw_employee.csv", "w") as f:
    f.write(csv_data)

Load data with header.

In [77]:
df_raw = spark.read.csv("raw_employee.csv", header=True, inferSchema=True)
df_raw.show()

+------+-----+-------+------+-----+
|emp_id| name|   dept|salary|bonus|
+------+-----+-------+------+-----+
|     1|Arjun|     IT| 78000| 5000|
|     2|Kavya|     HR| 62000| NULL|
|     3|Sneha|Finance| 55000| 3000|
+------+-----+-------+------+-----+



Fill missing bonus with 2000.

In [79]:
df_filled = df_raw.fillna({"bonus": 2000})
df_filled.show()

+------+-----+-------+------+-----+
|emp_id| name|   dept|salary|bonus|
+------+-----+-------+------+-----+
|     1|Arjun|     IT| 78000| 5000|
|     2|Kavya|     HR| 62000| 2000|
|     3|Sneha|Finance| 55000| 3000|
+------+-----+-------+------+-----+



Calculate total_ctc = salary + bonus .

In [80]:
df_ctc = df_filled.withColumn("total_ctc", col("salary") + col("bonus"))
df_ctc.show()

+------+-----+-------+------+-----+---------+
|emp_id| name|   dept|salary|bonus|total_ctc|
+------+-----+-------+------+-----+---------+
|     1|Arjun|     IT| 78000| 5000|    83000|
|     2|Kavya|     HR| 62000| 2000|    64000|
|     3|Sneha|Finance| 55000| 3000|    58000|
+------+-----+-------+------+-----+---------+



Filter where total_ctc > 60,000.

In [81]:
df_filtered = df_ctc.filter(col("total_ctc") > 60000)
df_filtered.show()

+------+-----+----+------+-----+---------+
|emp_id| name|dept|salary|bonus|total_ctc|
+------+-----+----+------+-----+---------+
|     1|Arjun|  IT| 78000| 5000|    83000|
|     2|Kavya|  HR| 62000| 2000|    64000|
+------+-----+----+------+-----+---------+



Save final DataFrame to Parquet and JSON.

In [82]:
df_filtered.write.mode("overwrite").parquet("/tmp/final_ctc_parquet")
df_filtered.write.mode("overwrite").json("/tmp/final_ctc_json")