#    Exercise 1.1 INTIALIZING SPARKSESSION

In [45]:
from pyspark.sql import SparkSession
spark = SparkSession.builder .appName("BotCampus Intermediate Session") .master("local[*]").getOrCreate()
sc=spark.sparkContext

#  Exercise 1.2-Load starter data

In [46]:
data = [("Ananya", "Bangalore", 24),
("Ravi", "Hyderabad", 28),
("Kavya", "Delhi", 22),
("Meena", "Chennai", 25)]
columns = ["name", "city", "age"]
df = spark.createDataFrame(data, columns)
df.show()

+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Ananya|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
+------+---------+---+



#  Exercise 2.1 – Create RDD from feedback:

In [48]:
feedback = spark.sparkContext.parallelize([
"Ravi from Bangalore loved the mobile app",
"Meena from Delhi reported poor response time",
"Ajay from Pune liked the delivery speed",
"Ananya from Hyderabad had an issue with UI",
"Rohit from Mumbai gave positive feedback"
])

In [67]:
#counting total number of words
word_count=feedback.flatMap(lambda x:x.split())
word_count.count()

35

In [81]:
#finding top 3 most common words
word_count=feedback.flatMap(lambda x:x.split()).map(lambda x:(x.lower(),1)).reduceByKey(lambda x,y:x+y).takeOrdered(3,key=lambda pair: -pair[1])
print("Top 3words:",word_count)

Top 3words: [('from', 5), ('the', 2), ('loved', 1)]


In [84]:
# Remove stop words (from , with , the , etc.).
stop_words = {"from", "with", "the", "had", "gave", "an", "and", "to", "for", "on", "in", "a"}
filtered=feedback.flatMap(lambda x:x.split()).map(lambda x:x.lower()).filter(lambda x:x not in stop_words)
filtered.collect()

['ravi',
 'bangalore',
 'loved',
 'mobile',
 'app',
 'meena',
 'delhi',
 'reported',
 'poor',
 'response',
 'time',
 'ajay',
 'pune',
 'liked',
 'delivery',
 'speed',
 'ananya',
 'hyderabad',
 'issue',
 'ui',
 'rohit',
 'mumbai',
 'positive',
 'feedback']

In [92]:
# Create a dictionary of word → count.
word_dict=feedback.flatMap(lambda x:x.split()).map(lambda x:(x.lower(),1)).reduceByKey(lambda x,y:x+y).collectAsMap()
print(word_dict)

{'from': 5, 'loved': 1, 'app': 1, 'poor': 1, 'response': 1, 'liked': 1, 'speed': 1, 'ananya': 1, 'an': 1, 'issue': 1, 'with': 1, 'rohit': 1, 'mumbai': 1, 'positive': 1, 'feedback': 1, 'ravi': 1, 'bangalore': 1, 'the': 2, 'mobile': 1, 'meena': 1, 'delhi': 1, 'reported': 1, 'time': 1, 'ajay': 1, 'pune': 1, 'delivery': 1, 'hyderabad': 1, 'had': 1, 'ui': 1, 'gave': 1}


#3. DataFrames – Transformations
 Exercise 3.1 – Create exam_scores
 DataFrame:

In [96]:
scores = [
("Ravi", "Math", 88),
("Ananya", "Science", 92),
("Kavya", "English", 79),
("Ravi", "English", 67),
("Neha", "Math", 94),
("Meena", "Science", 85)
]
columns = ["name", "subject", "score"]
df_scores = spark.createDataFrame(scores, columns)
df_scores.show()

+------+-------+-----+
|  name|subject|score|
+------+-------+-----+
|  Ravi|   Math|   88|
|Ananya|Science|   92|
| Kavya|English|   79|
|  Ravi|English|   67|
|  Neha|   Math|   94|
| Meena|Science|   85|
+------+-------+-----+



In [166]:
# Add grade column (>=90 → A, 80-89 → B, 70-79 → C, else D).
from pyspark.sql.functions import col,when,sum
df_scores=df_scores.withColumn("grade_column",when(col('score')>=90,"A").when(col('score')>=80,"B").when(col('score')>=70,"C").otherwise("D"))
df_scores.show()
print("---------------------------")
 #Group by subject, find average score.
grouped_sub=df_scores.groupBy("subject").sum("score")
grouped_sub.show()
print("------------------------------")
#Use when and otherwise to classify subject difficulty (Difficult).

df_scores=df_scores.withColumn("subject difficulity",when(col('subject').isin(["Math","Science"]),"Difficult").otherwise("Easy"))
df_scores.show()
print("------------------------------")
# Rank students per subject using Window function.
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
windowSpec=Window.partitionBy("subject").orderBy(col("score").desc())
df_scores=df_scores.withColumn("rank",rank().over(windowSpec))
df_scores.show()
print("------------------------------")
# Apply UDF to format names (e.g., make all uppercase)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
to_upper=udf(lambda x:x.upper(),StringType())
df_scores=df_scores.withColumn("name",to_upper(col("name")))
df_scores.show()
df_scores=df_scores.select("name","subject","score","subject difficulity","grade_column","rank")


+------+-------+-----+-------------------+------------+----+
|  name|subject|score|subject difficulity|grade_column|rank|
+------+-------+-----+-------------------+------------+----+
| KAVYA|English|   79|               Easy|           C|   1|
|  RAVI|English|   67|               Easy|           D|   2|
|  NEHA|   Math|   94|          Difficult|           A|   1|
|  RAVI|   Math|   88|          Difficult|           B|   2|
|ANANYA|Science|   92|          Difficult|           A|   1|
| MEENA|Science|   85|          Difficult|           B|   2|
+------+-------+-----+-------------------+------------+----+

---------------------------
+-------+----------+
|subject|sum(score)|
+-------+----------+
|Science|       177|
|   Math|       182|
|English|       146|
+-------+----------+

------------------------------
+------+-------+-----+-------------------+------------+----+
|  name|subject|score|subject difficulity|grade_column|rank|
+------+-------+-----+-------------------+------------+----+

# 4. Ingest CSV & JSON – Save to Parquet
 Dataset 1: CSV file:
students.csv

In [112]:
csv_data="""
id,name,department,city,salary
1,Amit,IT,Bangalore,78000
2,Kavya,HR,Chennai,62000
3,Arjun,Finance,Hyderabad,55000"""
with open('students.csv','w')as f:
  f.write(csv_data)

In [117]:
import json
json_data=[
    {
 "id": 101,
 "name": "Sneha",
 "address": {
   "city": "Mumbai",
   "pincode": 400001
  },
 "skills": ["Python", "Spark"]
 }
]
with open('employee_nested.json','w')as f:
  json.dump(json_data,f)

# loading datasets into pyspark

In [123]:
sf=spark.read.csv('students.csv',header=True,inferSchema=True)

ef=spark.read.json('employee_nested.json',multiLine=True)


In [124]:
# Print schema and infer nested structure.
sf.printSchema()
ef.printSchema()
sf.show()
ef.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = true)

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pincode: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---+-----+----------+---------+------+
| id| name|department|     city|salary|
+---+-----+----------+---------+------+
|  1| Amit|        IT|Bangalore| 78000|
|  2|Kavya|        HR|  Chennai| 62000|
|  3|Arjun|   Finance|Hyderabad| 55000|
+---+-----+----------+---------+------+

+----------------+---+-----+---------------+
|         address| id| name|         skills|
+----------------+---+-----+---------------+
|{Mumbai, 400001}|101|Sneha|[Python, Spark]|
+----------------+---+-----+---------------+



In [137]:
#flattening json
from pyspark.sql.functions import explode
ef=ef.select(col("id"),col("name"),col("address.city").alias("city"),col("address.pincode").alias("pincode"),explode(col("skills")).alias("skill"))
ef.show()
#writing both files as parquet
sf.write.mode("overwrite").parquet("C:\\Users\\AnbuC\\OneDrive\\Desktop\\tmp\\students_parquet")

+---+-----+------+-------+------+
| id| name|  city|pincode| skill|
+---+-----+------+-------+------+
|101|Sneha|Mumbai| 400001|Python|
|101|Sneha|Mumbai| 400001| Spark|
+---+-----+------+-------+------+



In [146]:
#writing both files as parquet
sf.write.mode("overwrite").parquet("/tmp/output/students_parquet")
ef.write.mode("overwrite").parquet("/tmp/output/employee_parquet")


# 5. Spark SQL – Temp Views & Queries
 Exercise 5.1 Create view from exam scores and run:

In [168]:
df_scores.createOrReplaceTempView("exam_scores")
spark.sql("""select subject,name,score,rn from (select*,rank() over (partition by subject order by score desc)  as rn from exam_scores as tmp)
 where rn=1 """).show()

+-------+------+-----+---+
|subject|  name|score| rn|
+-------+------+-----+---+
|English| KAVYA|   79|  1|
|   Math|  NEHA|   94|  1|
|Science|ANANYA|   92|  1|
+-------+------+-----+---+



In [176]:
#count of students per grade
spark.sql("""select grade_column,count(*)as count_students from exam_scores group by grade_column""").show()

#students with multiple subjects

spark.sql("""select name from (select name,count(subject)as count_multiple_subjects from exam_scores group by name)as tmp where count_multiple_subjects >1""").show()

# subjects with average score above 85
spark.sql("""select subject,avg(score) as avg_score from exam_scores group by subject having avg_score >85 """).show()

+------------+--------------+
|grade_column|count_students|
+------------+--------------+
|           B|             2|
|           C|             1|
|           A|             2|
|           D|             1|
+------------+--------------+

+----+
|name|
+----+
|RAVI|
+----+

+-------+---------+
|subject|avg_score|
+-------+---------+
|Science|     88.5|
|   Math|     91.0|
+-------+---------+



 Exercise 5.2 Create another DataFrame
attendance(name, days_present)

In [186]:
att_data = [("RAVI", 22), ("ANANYA", 18), ("KAVYA", 25), ("MEENA", 19), ("NEHA", 20)]
df_att = spark.createDataFrame(att_data, ["name", "days_present"])
df_att.show()
print("-----------------")

# joining with score
joined_df=df_scores.join(df_att,on='name',how='left')
joined_df.show()
#Calculate attendance-adjusted grade:
print("---------------")
from pyspark.sql.functions import expr
grade_order=expr("""
              case
               when days_present < 20 and grade_column="A" then "B"
               when days_present < 20 and grade_column="B" then "c"
               when days_present < 20 and grade_column="c" then "D"
               else grade_column
               end

 """)
joined_df.withColumn("grade_adjusted",grade_order).show()



+------+------------+
|  name|days_present|
+------+------------+
|  RAVI|          22|
|ANANYA|          18|
| KAVYA|          25|
| MEENA|          19|
|  NEHA|          20|
+------+------------+

-----------------
+------+-------+-----+-------------------+------------+----+------------+
|  name|subject|score|subject difficulity|grade_column|rank|days_present|
+------+-------+-----+-------------------+------------+----+------------+
|  NEHA|   Math|   94|          Difficult|           A|   1|          20|
| MEENA|Science|   85|          Difficult|           B|   2|          19|
|  RAVI|English|   67|               Easy|           D|   2|          22|
|  RAVI|   Math|   88|          Difficult|           B|   2|          22|
|ANANYA|Science|   92|          Difficult|           A|   1|          18|
| KAVYA|English|   79|               Easy|           C|   1|          25|
+------+-------+-----+-------------------+------------+----+------------+

---------------
+------+-------+-----+----

# 6. Partitioned Load (Full + Incremental)

In [187]:
# Initial Load:
df_scores.write.partitionBy("subject").parquet("/tmp/scores/")

In [188]:
# incremental load
incremental = [("Meena", "Math", 93)]
df_inc = spark.createDataFrame(incremental, columns)
df_inc.write.mode("append").partitionBy("subject").parquet("/tmp/scores/")

In [191]:
#listing all folder inside  /tmp/scres
import os
print("partitions:"),os.listdir("/tmp/scores/")
#reading only math partion
df_math = spark.read.parquet("/tmp/scores/subject=Math")
df_math.show()


partitions:
+-----+-----+-------------------+------------+----+
| name|score|subject difficulity|grade_column|rank|
+-----+-----+-------------------+------------+----+
| NEHA|   94|          Difficult|           A|   1|
| RAVI|   88|          Difficult|           B|   2|
|Meena|   93|               NULL|        NULL|NULL|
+-----+-----+-------------------+------------+----+



#ETL: Clean, Transform, Load

In [208]:
raw_data = [
    (1, "Arjun", "IT",      78000, 5000),
    (2, "Kavya", "HR",      62000, None),
    (3, "Sneha", "Finance", 55000, 3000)
]
cols = ["emp_id", "name", "dept", "salary", "bonus"]

from pyspark.sql.functions import coalesce, lit

df_raw = (
    spark.createDataFrame(raw_data, cols).withColumn("bonus", coalesce(col("bonus"), lit(2000)))
)
df_raw.show()
print("---------------")
# Calculate  total_ctc
df_raw=df_raw.withColumn("total_ctc",col('salary')+col('bonus'))
df_raw.show()
print("----------------")

#filter total_ctc>60000
df_filter=df_raw.filter(col('total_ctc')>60000)
df_filter.show()
print("--------------")



+------+-----+-------+------+-----+
|emp_id| name|   dept|salary|bonus|
+------+-----+-------+------+-----+
|     1|Arjun|     IT| 78000| 5000|
|     2|Kavya|     HR| 62000| 2000|
|     3|Sneha|Finance| 55000| 3000|
+------+-----+-------+------+-----+

---------------
+------+-----+-------+------+-----+---------+
|emp_id| name|   dept|salary|bonus|total_ctc|
+------+-----+-------+------+-----+---------+
|     1|Arjun|     IT| 78000| 5000|    83000|
|     2|Kavya|     HR| 62000| 2000|    64000|
|     3|Sneha|Finance| 55000| 3000|    58000|
+------+-----+-------+------+-----+---------+

----------------
+------+-----+----+------+-----+---------+
|emp_id| name|dept|salary|bonus|total_ctc|
+------+-----+----+------+-----+---------+
|     1|Arjun|  IT| 78000| 5000|    83000|
|     2|Kavya|  HR| 62000| 2000|    64000|
+------+-----+----+------+-----+---------+



In [210]:
#saving final dataframe to parquet and json
df_filter.write.mode("overwrite").parquet("/tmp/output/final_ctc.parquet")
df_filter.write.mode("overwrite").json("/tmp/output/final_ctc.json")