#1. PySpark Setup & Initialization
###Exercise 1.1 – Setup Spark:
###Initialize SparkSession with:

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BotCampus Intermediate Session") \
    .master("local[*]") \
    .getOrCreate()

###  Exercise 1.2 – Load starter data:

In [2]:
data = [("Ananya", "Bangalore", 24),
        ("Ravi", "Hyderabad", 28),
        ("Kavya", "Delhi", 22),
        ("Meena", "Chennai", 25)]
columns = ["name", "city", "age"]

df = spark.createDataFrame(data, columns)
df.show()

+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Ananya|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
+------+---------+---+



# 2. RDDs & Transformations
###Exercise 2.1 – Create RDD from feedback:

In [3]:
feedback = spark.sparkContext.parallelize([
"Ravi from Bangalore loved the mobile app",
"Meena from Delhi reported poor response time",
"Ajay from Pune liked the delivery speed",
"Ananya from Hyderabad had an issue with UI",
"Rohit from Mumbai gave positive feedback"
])

# Tasks:
##Count total number of words.

In [4]:
total_words = feedback.flatMap(lambda line: line.split()).count()
print("Total number of words:", total_words)

Total number of words: 35


##  Find top 3 most common words.

In [6]:
from collections import Counter
word_counts = feedback.flatMap(lambda line: line.lower().split()) \
.filter(lambda word: word not in {"the", "with", "and", "from", "had", "an"}) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: -x[1]) \
.take(3)
print("Top 3 most common words:", word_counts)

Top 3 most common words: [('loved', 1), ('app', 1), ('poor', 1)]


##  Remove stop words (from ,  with ,  the , etc.).

In [9]:
stop_words = {"from", "with", "the", "an", "a", "had", "and", "is", "was", "of", "to"}
filtered_words_rdd = feedback.flatMap(lambda line: line.lower().split()) \
.filter(lambda word: word not in stop_words)

## Create a dictionary of word → count.

In [11]:
word_count_dict = dict(filtered_words_rdd.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.collect())
print(word_count_dict)

{'loved': 1, 'app': 1, 'poor': 1, 'response': 1, 'liked': 1, 'speed': 1, 'ananya': 1, 'issue': 1, 'rohit': 1, 'mumbai': 1, 'positive': 1, 'feedback': 1, 'ravi': 1, 'bangalore': 1, 'mobile': 1, 'meena': 1, 'delhi': 1, 'reported': 1, 'time': 1, 'ajay': 1, 'pune': 1, 'delivery': 1, 'hyderabad': 1, 'ui': 1, 'gave': 1}


# 3. DataFrames – Transformations
###Exercise 3.1 – Create exam_scores DataFrame:

In [33]:
from pyspark.sql.functions import when, col, upper, avg, rank
from pyspark.sql.window import Window
scores = [
("Ravi", "Math", 88),
("Ananya", "Science", 92),
("Kavya", "English", 79),
("Ravi", "English", 67),
("Neha", "Math", 94),
("Meena", "Science", 85)
]
columns = ["name", "subject", "score"]
df_scores = spark.createDataFrame(scores, columns)

# Tasks:
##Add grade column(>=90 → A, 80-89 → B, 70-79 → C, else D).

In [26]:
df_scores = df_scores.withColumn("grade", when(col("score") >= 90, "A").when(col("score") >= 80, "B").when(col("score") >= 70, "C").otherwise("D")).show()

+------+-------+-----+-----+
|  name|subject|score|grade|
+------+-------+-----+-----+
|  Ravi|   Math|   88|    B|
|Ananya|Science|   92|    A|
| Kavya|English|   79|    C|
|  Ravi|English|   67|    D|
|  Neha|   Math|   94|    A|
| Meena|Science|   85|    B|
+------+-------+-----+-----+



## Group by subject, find average score.

In [29]:
df_scores.groupBy("subject").agg(avg("score").alias("avg_score")).show()

+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
|English|     73.0|
+-------+---------+



##  Use when and otherwise to classify subject difficulty (Difficult).

In [19]:
df_scores = df_scores.withColumn("difficulty", when(col("subject").isin("Math", "Science"), "Difficult").otherwise("Easy")).show()

+------+-------+-----+-----+----------+
|  name|subject|score|grade|difficulty|
+------+-------+-----+-----+----------+
|  Ravi|   Math|   88|    B| Difficult|
|Ananya|Science|   92|    A| Difficult|
| Kavya|English|   79|    C|      Easy|
|  Ravi|English|   67|    D|      Easy|
|  Neha|   Math|   94|    A| Difficult|
| Meena|Science|   85|    B| Difficult|
+------+-------+-----+-----+----------+



##  Rank students per subject using Window function.

In [34]:
window_spec = Window.partitionBy("subject").orderBy(col("score").desc())
df_scores = df_scores.withColumn("rank", rank().over(window_spec))

## Apply UDF to format names (e.g., make all uppercase).

In [35]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
upper_udf = udf(lambda x: x.upper(), StringType())
df_scores = df_scores.withColumn("name_upper", upper_udf("name"))
df_scores.show()

+------+-------+-----+----+----------+
|  name|subject|score|rank|name_upper|
+------+-------+-----+----+----------+
| Kavya|English|   79|   1|     KAVYA|
|  Ravi|English|   67|   2|      RAVI|
|  Neha|   Math|   94|   1|      NEHA|
|  Ravi|   Math|   88|   2|      RAVI|
|Ananya|Science|   92|   1|    ANANYA|
| Meena|Science|   85|   2|     MEENA|
+------+-------+-----+----+----------+



# 4. Ingest CSV & JSON – Save to Parquet
##Dataset 1: CSV file: students.csv

In [37]:
from google.colab import files
uploaded = files.upload()

Saving students.csv to students.csv


In [38]:
df_csv = spark.read.option("header", True).option("inferSchema", True).csv("students.csv")
df_csv.printSchema()
df_csv.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary : double (nullable = true)

+---+-----+----------+---------+-------+
| id| name|department|     city|salary |
+---+-----+----------+---------+-------+
|  1| Amit|        IT|Bangalore|78000.0|
|  2|Kavya|        HR|  Chennai|62000.0|
|  3|Arjun|   Finance|Hyderabad|55000.0|
+---+-----+----------+---------+-------+



In [39]:
from google.colab import files
uploaded = files.upload()

Saving employee_nested.json to employee_nested.json


In [40]:
df_json = spark.read.option("multiline", True).json("employee_nested.json")
df_json.printSchema()
df_json.show(truncate=False)

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pincode: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+---+-----+---------------+
|address         |id |name |skills         |
+----------------+---+-----+---------------+
|{Mumbai, 400001}|101|Sneha|[Python, Spark]|
+----------------+---+-----+---------------+



##  Flatten the JSON (use explode , select , alias ).

In [41]:
from pyspark.sql.functions import explode, col
df_flattened = df_json.select(col("id"),col("name"),col("address.city").alias("city"),col("address.pincode")
.alias("pincode"),explode(col("skills")).alias("skill"))
df_flattened.show()

+---+-----+------+-------+------+
| id| name|  city|pincode| skill|
+---+-----+------+-------+------+
|101|Sneha|Mumbai| 400001|Python|
|101|Sneha|Mumbai| 400001| Spark|
+---+-----+------+-------+------+



##  Convert both to Parquet and write to /tmp/output .

In [42]:
df_csv.write.mode("overwrite").parquet("/tmp/output/students_parquet")
df_flattened.write.mode("overwrite").parquet("/tmp/output/employees_parquet")

#5. Spark SQL – Temp Views & Queries
##Exercise 5.1 Create view from exam scores and run:

In [45]:
df_scores.createOrReplaceTempView("exam_scores")
# a)Top scorer per subject
spark.sql("""SELECT subject, name, score FROM(SELECT *, RANK() OVER(PARTITION BY subject ORDER BY score DESC) as rnk FROM exam_scores)WHERE rnk = 1""").show()

from pyspark.sql.functions import when, col
# Add grade column
df_scores = df_scores.withColumn("grade", when(col("score") >= 90, "A").when(col("score") >= 80, "B").when(col("score") >= 70, "C").otherwise("D"))
df_scores.createOrReplaceTempView("exam_scores")

# b)Count of students per grade
spark.sql("SELECT grade, COUNT(*) as count FROM exam_scores GROUP BY grade").show()

# c)Students with multiple subjects
spark.sql("SELECT name, COUNT(subject) as subject_count FROM exam_scores GROUP BY name HAVING subject_count > 1").show()

# d)Subjects with average score above 85
spark.sql("SELECT subject, AVG(score) as avg_score FROM exam_scores GROUP BY subject HAVING avg_score > 85").show()


+-------+------+-----+
|subject|  name|score|
+-------+------+-----+
|English| Kavya|   79|
|   Math|  Neha|   94|
|Science|Ananya|   92|
+-------+------+-----+

+-----+-----+
|grade|count|
+-----+-----+
|    B|    2|
|    C|    1|
|    A|    2|
|    D|    1|
+-----+-----+

+----+-------------+
|name|subject_count|
+----+-------------+
|Ravi|            2|
+----+-------------+

+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
+-------+---------+



# Exercise 5.2 Create another DataFrame attendance(name, days_present) and:
##Join with scores
##Calculate attendance-adjusted grade:
###If days_present < 20 → downgrade grade by one level

In [46]:
attendance = [("Ravi", 25), ("Ananya", 18), ("Meena", 22), ("Kavya", 19)]
df_attendance = spark.createDataFrame(attendance, ["name", "days_present"])
df_joined = df_scores.join(df_attendance, on="name", how="left")

# Downgrade grade if attendance < 20
def downgrade(grade):
    order = ["A", "B", "C", "D"]
    idx = order.index(grade)
    return order[min(idx + 1, 3)]

from pyspark.sql.functions import udf
downgrade_udf = udf(downgrade, StringType())
df_final = df_joined.withColumn("final_grade",when(col("days_present") < 20, downgrade_udf(col("grade"))).otherwise(col("grade")))
df_final.select("name", "subject", "score", "grade", "days_present", "final_grade").show()

+------+-------+-----+-----+------------+-----------+
|  name|subject|score|grade|days_present|final_grade|
+------+-------+-----+-----+------------+-----------+
|Ananya|Science|   92|    A|          18|          B|
|  Ravi|   Math|   88|    B|          25|          B|
| Kavya|English|   79|    C|          19|          D|
|  Ravi|English|   67|    D|          25|          D|
|  Neha|   Math|   94|    A|        NULL|          A|
| Meena|Science|   85|    B|          22|          B|
+------+-------+-----+-----+------------+-----------+



#  6. Partitioned Load (Full + Incremental)
##Initial Load

In [47]:
df_scores.write.mode("overwrite").partitionBy("subject").parquet("/tmp/scores/")

##  Incremental Load:

In [48]:
incremental = [("Meena", "Math", 93)]
df_inc = spark.createDataFrame(incremental, ["name", "subject", "score"])
df_inc.write.mode("append").partitionBy("subject").parquet("/tmp/scores/")

#Task:
##List all folders inside Read only /tmp/scores/
##Read only Math partition and display all entries.

In [49]:
df_math = spark.read.parquet("/tmp/scores/subject=Math")
df_math.show()

+-----+-----+----+----------+-----+
| name|score|rank|name_upper|grade|
+-----+-----+----+----------+-----+
| Neha|   94|   1|      NEHA|    A|
| Ravi|   88|   2|      RAVI|    B|
|Meena|   93|NULL|      NULL| NULL|
+-----+-----+----+----------+-----+



# 7. ETL: Clean, Transform, Load
##Raw CSV:

In [50]:
from google.colab import files
uploaded = files.upload()

Saving emp.csv to emp.csv


In [53]:
df_raw = spark.read.option("header", True).csv("emp.csv", inferSchema=True)

## Fill missing bonus with 2000.

In [55]:
# Strip all column names
df_raw = df_raw.toDF(*[col_name.strip() for col_name in df_raw.columns])
# Now fill nulls in the 'bonus' column
df_filled = df_raw.fillna({"bonus": 2000})
df_filled.show()

+------+-----+-------+------+-----+
|emp_id| name|   dept|salary|bonus|
+------+-----+-------+------+-----+
|     1|Arjun|     IT| 78000|5000 |
|     2|Kavya|     HR| 62000|     |
|     3|Sneha|Finance| 55000| 3000|
+------+-----+-------+------+-----+



##Calculate total_ctc = salary + bonus .

In [59]:
from pyspark.sql.functions import col
df_final = df_filled.withColumn("total_ctc", col("salary") + col("bonus"))
df_final.show()

+------+-----+-------+------+-----+---------+
|emp_id| name|   dept|salary|bonus|total_ctc|
+------+-----+-------+------+-----+---------+
|     1|Arjun|     IT| 78000|5000 |  83000.0|
|     2|Kavya|     HR| 62000|     |     NULL|
|     3|Sneha|Finance| 55000| 3000|  58000.0|
+------+-----+-------+------+-----+---------+



## Filter where total_ctc > 60,000.

In [61]:
df_filtered = df_final.filter(col("total_ctc") > 60000)
df_filtered.show()

+------+-----+----+------+-----+---------+
|emp_id| name|dept|salary|bonus|total_ctc|
+------+-----+----+------+-----+---------+
|     1|Arjun|  IT| 78000|5000 |  83000.0|
+------+-----+----+------+-----+---------+



## Save final DataFrame to Parquet and JSON.

In [64]:
df_filtered.write.mode("overwrite").parquet("/tmp/final_output_parquet")
df_filtered.write.mode("overwrite").json("/tmp/final_output_json")

In [67]:
!zip -r /tmp/final_output_parquet.zip /tmp/final_output_parquet
!zip -r /tmp/final_output_json.zip /tmp/final_output_json

  adding: tmp/final_output_parquet/ (stored 0%)
  adding: tmp/final_output_parquet/._SUCCESS.crc (stored 0%)
  adding: tmp/final_output_parquet/.part-00000-ce7e8ca9-28a8-4f8d-82f5-f9be81802c29-c000.snappy.parquet.crc (stored 0%)
  adding: tmp/final_output_parquet/part-00000-ce7e8ca9-28a8-4f8d-82f5-f9be81802c29-c000.snappy.parquet (deflated 53%)
  adding: tmp/final_output_parquet/_SUCCESS (stored 0%)
  adding: tmp/final_output_json/ (stored 0%)
  adding: tmp/final_output_json/._SUCCESS.crc (stored 0%)
  adding: tmp/final_output_json/part-00000-ef7dbf01-afe4-4739-af87-9fef80195c9a-c000.json (deflated 7%)
  adding: tmp/final_output_json/_SUCCESS (stored 0%)
  adding: tmp/final_output_json/.part-00000-ef7dbf01-afe4-4739-af87-9fef80195c9a-c000.json.crc (stored 0%)


In [68]:
from google.colab import files

files.download("/tmp/final_output_parquet.zip")
files.download("/tmp/final_output_json.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>