# 1. PySpark Setup & Initialization

Exercise 1.1 – Setup Spark:

In [2]:
# Download Spark
!wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz


--2025-08-05 06:28:24--  https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400395283 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.0-bin-hadoop3.tgz’


2025-08-05 06:35:06 (976 KB/s) - ‘spark-3.5.0-bin-hadoop3.tgz’ saved [400395283/400395283]



In [1]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("BotCampus Intermediate Session") \
    .master("local[*]") \
    .getOrCreate()


Exercise 1.2 – Load starter data

In [7]:
 data = [("Ananya", "Bangalore", 24),
("Ravi", "Hyderabad", 28),
("Kavya", "Delhi", 22),
("Meena", "Chennai", 25)]
columns = ["name", "city", "age"]
df = spark.createDataFrame(data, columns)
df.show()

+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Ananya|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
+------+---------+---+



# 2. RDDs & Transformations

 Exercise 2.1 – Create RDD from feedback

In [8]:
 feedback = spark.sparkContext.parallelize([
"Ravi from Bangalore loved the mobile app",
"Meena from Delhi reported poor response time",
"Ajay from Pune liked the delivery speed",
"Ananya from Hyderabad had an issue with UI",
"Rohit from Mumbai gave positive feedback"
])

Task 1: Count total number of words.

In [9]:
total_words = feedback.flatMap(lambda x: x.split()).count()
print("Total Words:", total_words)

Total Words: 35


 2. Find top 3 most common words.

In [46]:
from collections import Counter
top3 = Counter(feedback.flatMap(lambda x: x.split()).collect()).most_common(3)
print("Top 3 words:", top3)

Top 3 words: [('from', 5), ('the', 2), ('Ravi', 1)]


3. Remove stop words (from , with , the etc.).

In [47]:
stopwords = {"from", "with", "the", "an", "and", "had"}
filtered_words = feedback.flatMap(lambda x: [w for w in x.lower().split() if w not in stopwords])
print("Filtered words:", filtered_words.collect())

Filtered words: ['ravi', 'bangalore', 'loved', 'mobile', 'app', 'meena', 'delhi', 'reported', 'poor', 'response', 'time', 'ajay', 'pune', 'liked', 'delivery', 'speed', 'ananya', 'hyderabad', 'issue', 'ui', 'rohit', 'mumbai', 'gave', 'positive', 'feedback']


4. Create a dictionary of word → count.

In [49]:
word_count_dict = dict(Counter(filtered_words.collect()))
print("Word counts:", word_count_dict)

Word counts: {'ravi': 1, 'bangalore': 1, 'loved': 1, 'mobile': 1, 'app': 1, 'meena': 1, 'delhi': 1, 'reported': 1, 'poor': 1, 'response': 1, 'time': 1, 'ajay': 1, 'pune': 1, 'liked': 1, 'delivery': 1, 'speed': 1, 'ananya': 1, 'hyderabad': 1, 'issue': 1, 'ui': 1, 'rohit': 1, 'mumbai': 1, 'gave': 1, 'positive': 1, 'feedback': 1}


# 3. DataFrames – Transformations

 Exercise 3.1 – Create exam_scores DataFrame:

In [62]:
from pyspark.sql.functions import when, col, upper
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

In [66]:
scores = [
    ("Ravi", "Math", 88),
    ("Ananya", "Science", 92),
    ("Kavya", "English", 79),
    ("Ravi", "English", 67),
    ("Neha", "Math", 94),
    ("Meena", "Science", 85)
]
columns = ["name", "subject", "score"]
df_scores = spark.createDataFrame(scores, columns)

 Task 1: Add grade column (>=90 → A, 80-89 → B,
70-79 → C, else D)

In [67]:
df_scores = df_scores.withColumn("grade", when(col("score") >= 90, "A")
                                 .when(col("score") >= 80, "B")
                                 .when(col("score") >= 70, "C")
                                 .otherwise("D"))
df_scores.show()

+------+-------+-----+-----+
|  name|subject|score|grade|
+------+-------+-----+-----+
|  Ravi|   Math|   88|    B|
|Ananya|Science|   92|    A|
| Kavya|English|   79|    C|
|  Ravi|English|   67|    D|
|  Neha|   Math|   94|    A|
| Meena|Science|   85|    B|
+------+-------+-----+-----+



2. Group by subject, find average score.

In [68]:
df_scores.groupBy("subject").avg("score").show()

+-------+----------+
|subject|avg(score)|
+-------+----------+
|Science|      88.5|
|   Math|      91.0|
|English|      73.0|
+-------+----------+



 3. Use when and otherwise to classify subject difficulty (Difficult).

In [69]:
df_scores = df_scores.withColumn("difficulty",
            when(col("subject").isin("Math", "Science"), "Difficult")
            .otherwise("Easy"))
df_scores.show()


+------+-------+-----+-----+----------+
|  name|subject|score|grade|difficulty|
+------+-------+-----+-----+----------+
|  Ravi|   Math|   88|    B| Difficult|
|Ananya|Science|   92|    A| Difficult|
| Kavya|English|   79|    C|      Easy|
|  Ravi|English|   67|    D|      Easy|
|  Neha|   Math|   94|    A| Difficult|
| Meena|Science|   85|    B| Difficult|
+------+-------+-----+-----+----------+



4. Rank students per subject using Window function.

In [70]:
windowSpec = Window.partitionBy("subject").orderBy(col("score").desc())
df_scores = df_scores.withColumn("rank", rank().over(windowSpec))
df_scores.select("name", "subject", "score", "rank").show()

+------+-------+-----+----+
|  name|subject|score|rank|
+------+-------+-----+----+
| Kavya|English|   79|   1|
|  Ravi|English|   67|   2|
|  Neha|   Math|   94|   1|
|  Ravi|   Math|   88|   2|
|Ananya|Science|   92|   1|
| Meena|Science|   85|   2|
+------+-------+-----+----+



5. Apply UDF to format names (e.g., make all uppercase).

In [71]:
df_scores = df_scores.withColumn("name_upper", upper(col("name")))
df_scores.show()

+------+-------+-----+-----+----------+----+----------+
|  name|subject|score|grade|difficulty|rank|name_upper|
+------+-------+-----+-----+----------+----+----------+
| Kavya|English|   79|    C|      Easy|   1|     KAVYA|
|  Ravi|English|   67|    D|      Easy|   2|      RAVI|
|  Neha|   Math|   94|    A| Difficult|   1|      NEHA|
|  Ravi|   Math|   88|    B| Difficult|   2|      RAVI|
|Ananya|Science|   92|    A| Difficult|   1|    ANANYA|
| Meena|Science|   85|    B| Difficult|   2|     MEENA|
+------+-------+-----+-----+----------+----+----------+



# 4. Ingest CSV & JSON – Save to Parquet

 Dataset 1: CSV file: students.csv

In [22]:
csv_data = """id,name,department,city,salary
1,Amit,IT,Bangalore,78000
2,Kavya,HR,Chennai,62000
3,Arjun,Finance,Hyderabad,55000"""
with open("students.csv", "w") as f:
    f.write(csv_data)


 Dataset 2: JSON file employee_nested.json

In [23]:
json_data = """
[
  {
    "id": 101,
    "name": "Sneha",
    "address": {
      "city": "Mumbai",
      "pincode": 400001
    },
    "skills": ["Python", "Spark"]
  }
]
"""
with open("employee_nested.json", "w") as f:
    f.write(json_data)


 Task 1: Load both datasets into PySpark and Print schema and infer nested structure.

In [24]:
df_csv = spark.read.csv("students.csv", header=True, inferSchema=True)

# Show contents and schema
df_csv.show()
df_csv.printSchema()



+---+-----+----------+---------+------+
| id| name|department|     city|salary|
+---+-----+----------+---------+------+
|  1| Amit|        IT|Bangalore| 78000|
|  2|Kavya|        HR|  Chennai| 62000|
|  3|Arjun|   Finance|Hyderabad| 55000|
+---+-----+----------+---------+------+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = true)



In [25]:
df_json = spark.read.json("employee_nested.json", multiLine=True)

# Show contents and schema (nested)
df_json.show(truncate=False)
df_json.printSchema()


+----------------+---+-----+---------------+
|address         |id |name |skills         |
+----------------+---+-----+---------------+
|{Mumbai, 400001}|101|Sneha|[Python, Spark]|
+----------------+---+-----+---------------+

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pincode: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)



 2. Flatten the JSON (use explode ,select, alias ).

In [26]:
from pyspark.sql.functions import explode, col

df_flat = df_json.select(
    "id",
    "name",
    col("address.city").alias("city"),
    col("address.pincode").alias("pincode"),
    explode("skills").alias("skill")
)

df_flat.show(truncate=False)
df_flat.printSchema()


+---+-----+------+-------+------+
|id |name |city  |pincode|skill |
+---+-----+------+-------+------+
|101|Sneha|Mumbai|400001 |Python|
|101|Sneha|Mumbai|400001 |Spark |
+---+-----+------+-------+------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- pincode: long (nullable = true)
 |-- skill: string (nullable = true)



3. Convert both to Parquet and write to
/tmp/output .

In [27]:
#Save CSV DataFrame to Parquet
df_csv.write.mode("overwrite").parquet("/tmp/output/students_parquet")

#Save flattened JSON DataFrame to Parquet
df_flat.write.mode("overwrite").parquet("/tmp/output/employees_parquet")



# 5. Spark SQL – Temp Views & Queries

Exercise 5.1 Create view from exam scores and run:

In [28]:
df_scores.createOrReplaceTempView("scores")

Task 1: Top scorer per subject

In [29]:
spark.sql("""
SELECT subject, name, score FROM (
  SELECT *, RANK() OVER(PARTITION BY subject ORDER BY score DESC) AS rnk FROM scores
) WHERE rnk = 1
""").show()

+-------+------+-----+
|subject|  name|score|
+-------+------+-----+
|English| Kavya|   79|
|   Math|  Neha|   94|
|Science|Ananya|   92|
+-------+------+-----+



2. Count of students per grade

In [30]:
spark.sql("SELECT grade, COUNT(*) as count FROM scores GROUP BY grade").show()

+-----+-----+
|grade|count|
+-----+-----+
|    B|    2|
|    C|    1|
|    A|    2|
|    D|    1|
+-----+-----+



3. Students with multiple subjects

In [31]:
spark.sql("""
SELECT name FROM scores
GROUP BY name HAVING COUNT(DISTINCT subject) > 1
""").show()

+----+
|name|
+----+
|Ravi|
+----+



4. Subjects with avg score > 85

In [32]:
spark.sql("""
SELECT subject, AVG(score) as avg_score
FROM scores
GROUP BY subject
HAVING avg_score > 85
""").show()

+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
+-------+---------+



 Exercise 5.2 Create another DataFrame
attendance(name, days_present) and

In [33]:
attendance_data = [
    ("Ravi", 18),
    ("Ananya", 25),
    ("Kavya", 22),
    ("Neha", 30),
    ("Meena", 15)
]

att = spark.createDataFrame(attendance_data, ["name", "days_present"])
att.show()

+------+------------+
|  name|days_present|
+------+------------+
|  Ravi|          18|
|Ananya|          25|
| Kavya|          22|
|  Neha|          30|
| Meena|          15|
+------+------------+



Task 1: Join with scores

In [34]:
joined_df = df_scores.join(att, on="name")
joined_df.select("name", "subject", "score", "grade", "days_present").show()

+------+-------+-----+-----+------------+
|  name|subject|score|grade|days_present|
+------+-------+-----+-----+------------+
|Ananya|Science|   92|    A|          25|
| Kavya|English|   79|    C|          22|
| Meena|Science|   85|    B|          15|
|  Neha|   Math|   94|    A|          30|
|  Ravi|   Math|   88|    B|          18|
|  Ravi|English|   67|    D|          18|
+------+-------+-----+-----+------------+



2. Calculate attendance-adjusted grade

In [35]:
from pyspark.sql.functions import when, col

adjusted_df = joined_df.withColumn("adj_grade",
    when(col("days_present") < 20,
         when(col("grade") == "A", "B")
        .when(col("grade") == "B", "C")
        .when(col("grade") == "C", "D")
        .otherwise("D"))
    .otherwise(col("grade"))
)

adjusted_df.select("name", "subject", "score", "grade", "days_present", "adj_grade").show()


+------+-------+-----+-----+------------+---------+
|  name|subject|score|grade|days_present|adj_grade|
+------+-------+-----+-----+------------+---------+
|Ananya|Science|   92|    A|          25|        A|
| Kavya|English|   79|    C|          22|        C|
| Meena|Science|   85|    B|          15|        C|
|  Neha|   Math|   94|    A|          30|        A|
|  Ravi|   Math|   88|    B|          18|        C|
|  Ravi|English|   67|    D|          18|        D|
+------+-------+-----+-----+------------+---------+



# 6. Partitioned Load

In [36]:
#Initial Load
df_scores.write.partitionBy("subject").parquet("/tmp/scores/")

In [37]:
#Incremental Load
incremental = [("Meena", "Math", 93)]
df_inc = spark.createDataFrame(incremental, columns)
df_inc.write.mode("append").partitionBy("subject").parquet("/tmp/scores/")

Task 1: List all folders inside /tmp/scores/

In [38]:
import os

path = "/tmp/scores/"

# List only directories (partitions)
partitions = [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f))]
print("Partitions in /tmp/scores/:")
for folder in partitions:
    print(folder)


Partitions in /tmp/scores/:
subject=Science
subject=English
subject=Math


2. Read only Math partition and display all entries.

In [39]:
df_math = spark.read.parquet("/tmp/scores/subject=Math")
df_math.show()

+-----+-----+-----+----------+----+----------+
| name|score|grade|difficulty|rank|name_upper|
+-----+-----+-----+----------+----+----------+
| Neha|   94|    A| Difficult|   1|      NEHA|
| Ravi|   88|    B| Difficult|   2|      RAVI|
|Meena|   93| NULL|      NULL|NULL|      NULL|
+-----+-----+-----+----------+----+----------+



# 7. ETL: Clean, Transform, Load

In [3]:
raw_data = """emp_id,name,dept,salary,bonus
1,Arjun,IT,78000,5000
2,Kavya,HR,62000,
3,Sneha,Finance,55000,3000"""
with open("emp_raw.csv", "w") as f:
    f.write(raw_data)


 Task 1: Load data with header.


In [4]:
df_etl = spark.read.csv("emp_raw.csv", header=True, inferSchema=True)

# Show data and schema to verify
df_etl.show()
df_etl.printSchema()


+------+-----+-------+------+-----+
|emp_id| name|   dept|salary|bonus|
+------+-----+-------+------+-----+
|     1|Arjun|     IT| 78000| 5000|
|     2|Kavya|     HR| 62000| NULL|
|     3|Sneha|Finance| 55000| 3000|
+------+-----+-------+------+-----+

root
 |-- emp_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- bonus: integer (nullable = true)



2. Fill missing bonus with 2000.

In [6]:
df_etl = df_etl.fillna({"bonus": 2000})
df_etl.show()

+------+-----+-------+------+-----+
|emp_id| name|   dept|salary|bonus|
+------+-----+-------+------+-----+
|     1|Arjun|     IT| 78000| 5000|
|     2|Kavya|     HR| 62000| 2000|
|     3|Sneha|Finance| 55000| 3000|
+------+-----+-------+------+-----+



3.  Calculate total_ctc = salary + bonus

In [7]:
from pyspark.sql.functions import col

df_etl = df_etl.withColumn("total_ctc", col("salary") + col("bonus"))
df_etl.select("emp_id", "name", "salary", "bonus", "total_ctc").show()

+------+-----+------+-----+---------+
|emp_id| name|salary|bonus|total_ctc|
+------+-----+------+-----+---------+
|     1|Arjun| 78000| 5000|    83000|
|     2|Kavya| 62000| 2000|    64000|
|     3|Sneha| 55000| 3000|    58000|
+------+-----+------+-----+---------+



 4. Filter where total_ctc > 60,000

In [10]:
df_final = df_etl.filter(col("total_ctc") > 60000)
df_final.show()


+------+-----+----+------+-----+---------+
|emp_id| name|dept|salary|bonus|total_ctc|
+------+-----+----+------+-----+---------+
|     1|Arjun|  IT| 78000| 5000|    83000|
|     2|Kavya|  HR| 62000| 2000|    64000|
+------+-----+----+------+-----+---------+



5. Save final DataFrame to Parquet and JSON

In [11]:
df_final.write.mode("overwrite").parquet("/tmp/final_emps")
df_final.write.mode("overwrite").json("/tmp/final_emps_json")