In [3]:
#Exercise-1.1
from pyspark.sql import SparkSession

# Creating Spark session
spark = SparkSession.builder \
    .appName("BotCampus Intermediate Session") \
    .master("local[*]") \
    .getOrCreate()

spark


In [2]:
#Exercise-1.2
#Loading the data
data = [("Ananya", "Bangalore", 24),
        ("Ravi", "Hyderabad", 28),
        ("Kavya", "Delhi", 22),
        ("Meena", "Chennai", 25)]

columns = ["name", "city", "age"]

df = spark.createDataFrame(data, columns)
df.show()


+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Ananya|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
+------+---------+---+



In [3]:
#Exercise-2.1
#Create RDD from feedback
feedback = spark.sparkContext.parallelize([
    "Ravi from Bangalore loved the mobile app",
    "Meena from Delhi reported poor response time",
    "Ajay from Pune liked the delivery speed",
    "Ananya from Hyderabad had an issue with UI",
    "Rohit from Mumbai gave positive feedback"
])


In [4]:
#Count total number of words
words=feedback.flatMap(lambda line:line.lower().split())
total_words=words.count()
print("Total number of words:",total_words)

Total number of words: 35


In [5]:
#Find top 3 most common words
word_counts=words.map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
top_3=word_counts.takeOrdered(3,key=lambda x:-x[1])
print("Top 3 common words",top_3)

Top 3 common words [('from', 5), ('the', 2), ('loved', 1)]


In [7]:
#Remove stop words ( from , with , the , etc.)
stopwords = {"from", "with", "the", "and", "an", "had", "was", "of"}
filtered_words=words.filter(lambda word:word.lower() not in stopwords)
filtered_counts=filtered_words.map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
#Creating a dictionary
word_count_dict=dict(filtered_counts.collect())
print(word_count_dict)

{'loved': 1, 'app': 1, 'poor': 1, 'response': 1, 'liked': 1, 'speed': 1, 'ananya': 1, 'issue': 1, 'rohit': 1, 'mumbai': 1, 'positive': 1, 'feedback': 1, 'ravi': 1, 'bangalore': 1, 'mobile': 1, 'meena': 1, 'delhi': 1, 'reported': 1, 'time': 1, 'ajay': 1, 'pune': 1, 'delivery': 1, 'hyderabad': 1, 'ui': 1, 'gave': 1}


In [4]:
#Exercise-3.1
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Data
scores = [
    ("Ravi", "Math", 88),
    ("Ananya", "Science", 92),
    ("Kavya", "English", 79),
    ("Ravi", "English", 67),
    ("Neha", "Math", 94),
    ("Meena", "Science", 85)
]

columns = ["name", "subject", "score"]

# Create DataFrame
df_scores = spark.createDataFrame(scores, columns)

# Show
df_scores.show()


+------+-------+-----+
|  name|subject|score|
+------+-------+-----+
|  Ravi|   Math|   88|
|Ananya|Science|   92|
| Kavya|English|   79|
|  Ravi|English|   67|
|  Neha|   Math|   94|
| Meena|Science|   85|
+------+-------+-----+



In [5]:
#Add grade column ( >=90 → A, 80-89 → B, 70-79 → C, else D).
df_scores=df_scores.withColumn("grade",
                               F.when(F.col("score")>=90,"A")
                               .when((F.col("score")>=80) & (F.col("score")<90),"B")
                               .when((F.col("score")>=70) & (F.col("score")<80),"C")
                               .otherwise("D")
                              )
df_scores.show()



+------+-------+-----+-----+
|  name|subject|score|grade|
+------+-------+-----+-----+
|  Ravi|   Math|   88|    B|
|Ananya|Science|   92|    A|
| Kavya|English|   79|    C|
|  Ravi|English|   67|    D|
|  Neha|   Math|   94|    A|
| Meena|Science|   85|    B|
+------+-------+-----+-----+



In [6]:
#Group by subject, find average score.
avg_scores=df_scores.groupBy("subject").agg(F.avg("score").alias("avg_score"))
avg_scores.show()

+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
|English|     73.0|
+-------+---------+



In [11]:
#Use when and otherwise to classify subject difficulty ( Math/Science =Difficult).
df_scores=df_scores.withColumn("difficulty",
                               F.when(F.col("subject").isin("Math","Science"),"Difficult")
                               .otherwise("Easy"))
df_scores.show()



+------+-------+-----+-----+----------+
|  name|subject|score|grade|difficulty|
+------+-------+-----+-----+----------+
|  Ravi|   Math|   88|    B| Difficult|
|Ananya|Science|   92|    A| Difficult|
| Kavya|English|   79|    C|      Easy|
|  Ravi|English|   67|    D|      Easy|
|  Neha|   Math|   94|    A| Difficult|
| Meena|Science|   85|    B| Difficult|
+------+-------+-----+-----+----------+



In [12]:
#Rank students per subject using Window function.
window_spec=Window.partitionBy("subject").orderBy(F.col("score").desc())
df_scores=df_scores.withColumn("rank",F.rank().over(window_spec))
df_scores.show()


+------+-------+-----+-----+----------+----+
|  name|subject|score|grade|difficulty|rank|
+------+-------+-----+-----+----------+----+
| Kavya|English|   79|    C|      Easy|   1|
|  Ravi|English|   67|    D|      Easy|   2|
|  Neha|   Math|   94|    A| Difficult|   1|
|  Ravi|   Math|   88|    B| Difficult|   2|
|Ananya|Science|   92|    A| Difficult|   1|
| Meena|Science|   85|    B| Difficult|   2|
+------+-------+-----+-----+----------+----+



In [7]:
#Apply UDF to format names (e.g., make all uppercase).
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
to_upper=udf(lambda x:x.upper(),StringType())
df_scores=df_scores.withColumn("name_upper",to_upper(F.col("name")))
df_scores.show()

+------+-------+-----+-----+----------+
|  name|subject|score|grade|name_upper|
+------+-------+-----+-----+----------+
|  Ravi|   Math|   88|    B|      RAVI|
|Ananya|Science|   92|    A|    ANANYA|
| Kavya|English|   79|    C|     KAVYA|
|  Ravi|English|   67|    D|      RAVI|
|  Neha|   Math|   94|    A|      NEHA|
| Meena|Science|   85|    B|     MEENA|
+------+-------+-----+-----+----------+



In [9]:
#Ingest CSV & JSON – Save to Parquet
from google.colab import files
files.upload()


Saving employee_nested.json to employee_nested.json
Saving students.csv to students.csv


{'employee_nested.json': b'\r\n[\r\n{\r\n"id": 101,\r\n"name": "Sneha",\r\n"address": {\r\n"city": "Mumbai",\r\n"pincode": 400001\r\n},\r\n"skills": ["Python", "Spark"]\r\n}\r\n]\t\r\n',
 'students.csv': b'\r\nid,name,department,city,salary\r\n1,Amit,IT,Bangalore,78000\r\n2,Kavya,HR,Chennai,62000\r\n3,Arjun,Finance,Hyderabad,5500\r\n'}

In [10]:
# Load CSV into df_csv
df_csv = spark.read.option("header", True).csv("students.csv")

# View data
df_csv.show()


+---+-----+----------+---------+------+
| id| name|department|     city|salary|
+---+-----+----------+---------+------+
|  1| Amit|        IT|Bangalore| 78000|
|  2|Kavya|        HR|  Chennai| 62000|
|  3|Arjun|   Finance|Hyderabad|  5500|
+---+-----+----------+---------+------+



In [11]:
# Load nested JSON
df_json = spark.read.option("multiline", True).json("employee_nested.json")

# View schema and data
df_json.printSchema()
df_json.show(truncate=False)


root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pincode: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+---+-----+---------------+
|address         |id |name |skills         |
+----------------+---+-----+---------------+
|{Mumbai, 400001}|101|Sneha|[Python, Spark]|
+----------------+---+-----+---------------+



In [12]:
#Flatten the JSON (use explode , select , alias ).
from pyspark.sql.functions import col,explode
df_flat=df_json.select(
    col("id"),
    col("name"),
    col("address.city").alias("city"),
    col("address.pincode").alias("pincode"),
    explode(col("skills")).alias("skill")
)
df_flat.show()

+---+-----+------+-------+------+
| id| name|  city|pincode| skill|
+---+-----+------+-------+------+
|101|Sneha|Mumbai| 400001|Python|
|101|Sneha|Mumbai| 400001| Spark|
+---+-----+------+-------+------+



In [13]:
#Convert both to Parquet and write to /tmp/output

df_csv.write.mode("overwrite").parquet("/tmp/output/students_parquet")

df_flat.write.mode("overwrite").parquet("/tmp/output/employees_parquet")


In [14]:
#. Spark SQL – Temp Views & Queries
from pyspark.sql import functions as F

scores = [
    ("Ravi", "Math", 88),
    ("Ananya", "Science", 92),
    ("Kavya", "English", 79),
    ("Ravi", "English", 67),
    ("Neha", "Math", 94),
    ("Meena", "Science", 85)
]

columns = ["name", "subject", "score"]

df_scores = spark.createDataFrame(scores, columns)

df_scores = df_scores.withColumn(
    "grade",
    F.when(F.col("score") >= 90, "A")
     .when((F.col("score") >= 80) & (F.col("score") < 90), "B")
     .when((F.col("score") >= 70) & (F.col("score") < 80), "C")
     .otherwise("D")
)

df_scores.createOrReplaceTempView("exam_scores")


In [15]:
#Exercise-5.1

#a) Top scorer per subject
spark.sql("""
SELECT subject,name,score
FROM(
  SELECT *,RANK() OVER(PARTITION BY subject ORDER BY score DESC) as rank
  FROM exam_scores
  )
  WHERE rank=1
  """).show()


+-------+------+-----+
|subject|  name|score|
+-------+------+-----+
|English| Kavya|   79|
|   Math|  Neha|   94|
|Science|Ananya|   92|
+-------+------+-----+



In [16]:
#b)Count of students per grade
spark.sql("""
SELECT grade,COUNT(*) as total_students
FROM exam_scores
GROUP BY grade
""").show()

+-----+--------------+
|grade|total_students|
+-----+--------------+
|    B|             2|
|    C|             1|
|    A|             2|
|    D|             1|
+-----+--------------+



In [18]:
#c) Students with multiple subjects
spark.sql("""
SELECT name,COUNT(DISTINCT subject) AS subject_count
FROM exam_scores
GROUP BY name
HAVING subject_count > 1
""").show()

+----+-------------+
|name|subject_count|
+----+-------------+
|Ravi|            2|
+----+-------------+



In [19]:
# d) Subjects with average score above
spark.sql("""
SELECT subject,AVG(score) as avg_score
FROM exam_scores
GROUP BY subject
HAVING AVG(score) > 85
""").show()

+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
+-------+---------+



In [20]:
#Exercise-5.2
#Creating attendence data frame
attendance_data = [
    ("Ravi", 18),
    ("Ananya", 25),
    ("Kavya", 22),
    ("Neha", 19),
    ("Meena", 30)
]

columns = ["name", "days_present"]

df_attendance = spark.createDataFrame(attendance_data, columns)

df_attendance.show()


+------+------------+
|  name|days_present|
+------+------------+
|  Ravi|          18|
|Ananya|          25|
| Kavya|          22|
|  Neha|          19|
| Meena|          30|
+------+------------+



In [22]:
#Join with df_score
df_joined=df_scores.join(df_attendance,on="name",how="left")
df_joined.show()

+------+-------+-----+-----+------------+
|  name|subject|score|grade|days_present|
+------+-------+-----+-----+------------+
|Ananya|Science|   92|    A|          25|
|  Ravi|   Math|   88|    B|          18|
| Kavya|English|   79|    C|          22|
|  Ravi|English|   67|    D|          18|
|  Neha|   Math|   94|    A|          19|
| Meena|Science|   85|    B|          30|
+------+-------+-----+-----+------------+



In [23]:
#Calculate attendance-adjusted grade:
from pyspark.sql.functions import when

df_adjusted = df_joined.withColumn(
    "adjusted_grade",
    when(F.col("days_present") < 20,
         when(F.col("grade") == "A", "B")
        .when(F.col("grade") == "B", "C")
        .when(F.col("grade") == "C", "D")
        .otherwise("D")
    ).otherwise(F.col("grade"))
)
df_adjusted.select("name", "subject", "score", "grade", "days_present", "adjusted_grade").show()


+------+-------+-----+-----+------------+--------------+
|  name|subject|score|grade|days_present|adjusted_grade|
+------+-------+-----+-----+------------+--------------+
|Ananya|Science|   92|    A|          25|             A|
|  Ravi|   Math|   88|    B|          18|             C|
| Kavya|English|   79|    C|          22|             C|
|  Ravi|English|   67|    D|          18|             D|
|  Neha|   Math|   94|    A|          19|             B|
| Meena|Science|   85|    B|          30|             B|
+------+-------+-----+-----+------------+--------------+



In [24]:
# 6. Partitioned Load (Full + Incremental)
df_scores.write.partitionBy("subject").mode("overwrite").parquet("/tmp/scores/")


In [25]:
# Incremental new data
incremental_data = [("Meena", "Math", 93)]
df_inc = spark.createDataFrame(incremental_data, ["name", "subject", "score"])
df_inc.write.mode("append").partitionBy("subject").parquet("/tmp/scores/")


In [26]:
#List all partition folders
import os

# View folders under /tmp/scores (works in local, may not in Colab)
print(os.listdir("/tmp/scores"))


['._SUCCESS.crc', 'subject=Science', 'subject=English', '_SUCCESS', 'subject=Math']


In [27]:
#Read only Math partition and display all entries.
df_math = spark.read.parquet("/tmp/scores/subject=Math")

df_math.show()

+-----+-----+-----+
| name|score|grade|
+-----+-----+-----+
| Neha|   94|    A|
| Ravi|   88|    B|
|Meena|   93| NULL|
+-----+-----+-----+



In [30]:
# ETL: Clean, Transform, Load
from google.colab import files
uploaded = files.upload()



Saving employee_raw.csv to employee_raw.csv


In [31]:
df_emp = spark.read.option("header", True).csv("/content/employee_raw.csv")
df_emp.show()


+------+-----+-------+------+------+
|emp_id| name|   dept|salary| bonus|
+------+-----+-------+------+------+
|     1|Arjun|     IT| 78000|5000.0|
|     2|Kavya|     HR| 62000|  NULL|
|     3|Sneha|Finance| 55000|3000.0|
+------+-----+-------+------+------+



In [32]:
from pyspark.sql.functions import col

df_emp = df_emp.withColumn("salary", col("salary").cast("int"))
df_emp = df_emp.withColumn("bonus", col("bonus").cast("int"))


In [33]:
#Fill missing bonus with 2000
df_emp = df_emp.fillna({"bonus": 2000})


In [34]:
#Calculate total_ctc = salary + bonus
from pyspark.sql.functions import expr

df_emp = df_emp.withColumn("total_ctc", expr("salary + bonus"))
df_emp.select("emp_id", "name", "salary", "bonus", "total_ctc").show()


+------+-----+------+-----+---------+
|emp_id| name|salary|bonus|total_ctc|
+------+-----+------+-----+---------+
|     1|Arjun| 78000| 5000|    83000|
|     2|Kavya| 62000| 2000|    64000|
|     3|Sneha| 55000| 3000|    58000|
+------+-----+------+-----+---------+



In [35]:
#Filter where total_ctc > 60,000
df_high_earners = df_emp.filter(col("total_ctc") > 60000)
df_high_earners.show()


+------+-----+----+------+-----+---------+
|emp_id| name|dept|salary|bonus|total_ctc|
+------+-----+----+------+-----+---------+
|     1|Arjun|  IT| 78000| 5000|    83000|
|     2|Kavya|  HR| 62000| 2000|    64000|
+------+-----+----+------+-----+---------+



In [36]:
#Save final DataFrame to Parquet and JSON
df_high_earners.write.mode("overwrite").json("/content/cleaned_employees_json")