In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("BotCampus PySpark Practice") \
    .master("local[*]") \
    .getOrCreate()

In [5]:
data = [("Anjali", "Bangalore", 24),
        ("Ravi", "Hyderabad", 28),
        ("Kavya", "Delhi", 22),
        ("Meena", "Chennai", 25),
        ("Arjun", "Mumbai", 30)]
columns = ["name", "city", "age"]
df = spark.createDataFrame(data, columns)

In [6]:
df.show()

+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Anjali|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
| Arjun|   Mumbai| 30|
+------+---------+---+



In [7]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- age: long (nullable = true)



In [12]:
rdd = df.rdd
print(rdd.collect())

[Row(name='Anjali', city='Bangalore', age=24), Row(name='Ravi', city='Hyderabad', age=28), Row(name='Kavya', city='Delhi', age=22), Row(name='Meena', city='Chennai', age=25), Row(name='Arjun', city='Mumbai', age=30)]


In [13]:
mapped_rdd = rdd.map(lambda x: (x.name.upper(), x.city, x.age+1))
print(mapped_rdd.collect())

[('ANJALI', 'Bangalore', 25), ('RAVI', 'Hyderabad', 29), ('KAVYA', 'Delhi', 23), ('MEENA', 'Chennai', 26), ('ARJUN', 'Mumbai', 31)]


## Module 2: RDDs & Transformations

In [16]:
feedback = spark.sparkContext.parallelize([
    "Ravi from Bangalore loved the delivery",
    "Meena from Hyderabad had a late order",
    "Ajay from Pune liked the service",
    "Anjali from Delhi faced UI issues",
    "Rohit from Mumbai gave positive feedback"
])

In [20]:
words = feedback.flatMap(lambda line: line.split(" "))
print(words.collect())

['Ravi', 'from', 'Bangalore', 'loved', 'the', 'delivery', 'Meena', 'from', 'Hyderabad', 'had', 'a', 'late', 'order', 'Ajay', 'from', 'Pune', 'liked', 'the', 'service', 'Anjali', 'from', 'Delhi', 'faced', 'UI', 'issues', 'Rohit', 'from', 'Mumbai', 'gave', 'positive', 'feedback']


In [24]:
stop_words = {"from", "the", "a", "had", "and", "of"}
filtered_words = words.filter(lambda w: w.lower() not in stop_words)
print(filtered_words.collect())

['Ravi', 'Bangalore', 'loved', 'delivery', 'Meena', 'Hyderabad', 'late', 'order', 'Ajay', 'Pune', 'liked', 'service', 'Anjali', 'Delhi', 'faced', 'UI', 'issues', 'Rohit', 'Mumbai', 'gave', 'positive', 'feedback']


In [30]:
word_counts = filtered_words.map(lambda w: (w.lower(), 1)).reduceByKey(lambda a,b: a+b)
print(word_counts.collect())

[('loved', 1), ('liked', 1), ('service', 1), ('anjali', 1), ('faced', 1), ('issues', 1), ('rohit', 1), ('mumbai', 1), ('positive', 1), ('feedback', 1), ('ravi', 1), ('bangalore', 1), ('delivery', 1), ('meena', 1), ('hyderabad', 1), ('late', 1), ('order', 1), ('ajay', 1), ('pune', 1), ('delhi', 1), ('ui', 1), ('gave', 1)]


In [33]:
top3 = word_counts.takeOrdered(3, key=lambda x: -x[1])
print(top3)

[('loved', 1), ('liked', 1), ('service', 1)]


## Module 3: DataFrames & Transformation (With Joins)

In [34]:
from pyspark.sql.functions import col, when

students = [("Amit", "10-A", 89),
            ("Kavya", "10-B", 92),
            ("Anjali", "10-A", 78),
            ("Rohit", "10-B", 85),
            ("Sneha", "10-C", 80)]
columns1 = ["name", "section", "marks"]
df_students = spark.createDataFrame(students, columns1)

attendance = [("Amit", 24),
              ("Kavya", 22),
              ("Anjali", 20),
              ("Rohit", 25),
              ("Sneha", 19)]
columns2 = ["name", "days_present"]
df_attendance = spark.createDataFrame(attendance, columns2)

In [37]:
df_joined = df_students.join(df_attendance, on="name")
df_joined.show()

+------+-------+-----+------------+
|  name|section|marks|days_present|
+------+-------+-----+------------+
|  Amit|   10-A|   89|          24|
|Anjali|   10-A|   78|          20|
| Kavya|   10-B|   92|          22|
| Rohit|   10-B|   85|          25|
| Sneha|   10-C|   80|          19|
+------+-------+-----+------------+



In [41]:
df_joined = df_joined.withColumn("attendance_rate", col("days_present")/25)
df_joined.show()

+------+-------+-----+------------+---------------+
|  name|section|marks|days_present|attendance_rate|
+------+-------+-----+------------+---------------+
|  Amit|   10-A|   89|          24|           0.96|
|Anjali|   10-A|   78|          20|            0.8|
| Kavya|   10-B|   92|          22|           0.88|
| Rohit|   10-B|   85|          25|            1.0|
| Sneha|   10-C|   80|          19|           0.76|
+------+-------+-----+------------+---------------+



In [45]:
df_joined = df_joined.withColumn(
    "grade",
    when(col("marks") > 90, "A")
    .when((col("marks") >= 80) & (col("marks") <= 90), "B")
    .otherwise("C")
)
df_joined.show()

+------+-------+-----+------------+---------------+-----+
|  name|section|marks|days_present|attendance_rate|grade|
+------+-------+-----+------------+---------------+-----+
|  Amit|   10-A|   89|          24|           0.96|    B|
|Anjali|   10-A|   78|          20|            0.8|    C|
| Kavya|   10-B|   92|          22|           0.88|    A|
| Rohit|   10-B|   85|          25|            1.0|    B|
| Sneha|   10-C|   80|          19|           0.76|    B|
+------+-------+-----+------------+---------------+-----+



In [50]:
df_filtered = df_joined.filter((col("grade").isin("A","B")) & (col("attendance_rate") < 0.8))
df_filtered.show()

+-----+-------+-----+------------+---------------+-----+
| name|section|marks|days_present|attendance_rate|grade|
+-----+-------+-----+------------+---------------+-----+
|Sneha|   10-C|   80|          19|           0.76|    B|
+-----+-------+-----+------------+---------------+-----+



In [52]:
csv_data = "emp_id,name,dept,city,salary\n101,Anil,IT,Bangalore,80000\n102,Kiran,HR,Mumbai,65000\n103,Deepa,Finance,Chennai,72000"
with open("/content/employees.csv", "w") as f:
    f.write(csv_data)
df_csv = spark.read.csv("/content/employees.csv", header=True, inferSchema=True)
df_csv.show()

+------+-----+-------+---------+------+
|emp_id| name|   dept|     city|salary|
+------+-----+-------+---------+------+
|   101| Anil|     IT|Bangalore| 80000|
|   102|Kiran|     HR|   Mumbai| 65000|
|   103|Deepa|Finance|  Chennai| 72000|
+------+-----+-------+---------+------+



In [70]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType

json_data = '''{
  "id": 201,
  "name": "Nandini",
  "contact": {
    "email": "nandi@example.com",
    "city": "Hyderabad"
  },
  "skills": ["Python", "Spark", "SQL"]
}'''
with open("/content/employee.json", "w") as f:
    f.write(json_data)

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("contact", StructType([
        StructField("email", StringType(), True),
        StructField("city", StringType(), True)
    ]), True),
    StructField("skills", ArrayType(StringType()), True)
])

df_json = spark.read.schema(schema).json("/content/employee.json")
df_json.show(truncate=False)

+----+----+-------+------+
|id  |name|contact|skills|
+----+----+-------+------+
|NULL|NULL|NULL   |NULL  |
|NULL|NULL|NULL   |NULL  |
|NULL|NULL|NULL   |NULL  |
|NULL|NULL|NULL   |NULL  |
|NULL|NULL|NULL   |NULL  |
|NULL|NULL|NULL   |NULL  |
|NULL|NULL|NULL   |NULL  |
|NULL|NULL|NULL   |NULL  |
|NULL|NULL|NULL   |NULL  |
+----+----+-------+------+



In [71]:

# Flatten nested JSON
from pyspark.sql.functions import col, explode
df_flattened = df_json.select(
    col("id"),
    col("name"),
    col("contact.email").alias("email"),
    col("contact.city").alias("city"),
    explode(col("skills")).alias("skill")
)
df_flattened.show()


+---+----+-----+----+-----+
| id|name|email|city|skill|
+---+----+-----+----+-----+
+---+----+-----+----+-----+



In [74]:
df_csv.write.partitionBy("city").parquet("/content/output/employees/sa")
df_flattened.write.partitionBy("city").parquet("/content/output/employee_skills/sa")

In [77]:
df_students.createOrReplaceTempView("students_view")

In [81]:
spark.sql("SELECT section, AVG(marks) as avg_marks FROM students_view GROUP BY section").show()

+-------+---------+
|section|avg_marks|
+-------+---------+
|   10-A|     83.5|
|   10-B|     88.5|
|   10-C|     80.0|
+-------+---------+



In [84]:
spark.sql("SELECT section, name, marks FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY section ORDER BY marks DESC) as rn FROM students_view) WHERE rn = 1").show()

+-------+-----+-----+
|section| name|marks|
+-------+-----+-----+
|   10-A| Amit|   89|
|   10-B|Kavya|   92|
|   10-C|Sneha|   80|
+-------+-----+-----+



In [91]:
spark.sql("SELECT grade, COUNT(*) as student_count FROM (SELECT name, CASE WHEN marks > 90 THEN 'A' WHEN marks >= 80 THEN 'B' ELSE 'C' END as grade FROM students_view) GROUP BY grade").show()

+-----+-------------+
|grade|student_count|
+-----+-------------+
|    B|            3|
|    A|            1|
|    C|            1|
+-----+-------------+



In [95]:
spark.sql("SELECT * FROM students_view WHERE marks > (SELECT AVG(marks) FROM students_view)").show()


+-----+-------+-----+
| name|section|marks|
+-----+-------+-----+
| Amit|   10-A|   89|
|Kavya|   10-B|   92|
|Rohit|   10-B|   85|
+-----+-------+-----+



In [99]:

df_attendance.createOrReplaceTempView("attendance_view")
spark.sql("SELECT s.name, s.section, s.marks, a.days_present, (s.marks * (a.days_present/25)) as adj_perf FROM students_view s JOIN attendance_view a ON s.name = a.name").show()


+------+-------+-----+------------+------------------+
|  name|section|marks|days_present|          adj_perf|
+------+-------+-----+------------+------------------+
|  Amit|   10-A|   89|          24|             85.44|
|Anjali|   10-A|   78|          20|62.400000000000006|
| Kavya|   10-B|   92|          22|             80.96|
| Rohit|   10-B|   85|          25|              85.0|
| Sneha|   10-C|   80|          19|              60.8|
+------+-------+-----+------------+------------------+



In [100]:
df_students.write.partitionBy("section").parquet("/content/output/students/")


In [102]:
incremental = [("Tejas", "10-A", 91)]
df_inc = spark.createDataFrame(incremental, ["name", "section", "marks"])
df_inc.write.mode("append").partitionBy("section").parquet("/content/output/students/")


In [103]:
import os
for root, dirs, files in os.walk("/content/output/students/"):
    for file in files:
        print(os.path.join(root, file))


/content/output/students/._SUCCESS.crc
/content/output/students/_SUCCESS
/content/output/students/section=10-A/.part-00001-498ba4c0-371e-4b45-a179-a82263a78564.c000.snappy.parquet.crc
/content/output/students/section=10-A/.part-00000-2c2e59fa-08fd-4140-a93a-2baa846bd2b8.c000.snappy.parquet.crc
/content/output/students/section=10-A/part-00001-8301ea02-c4d1-4285-b978-880b0e761691.c000.snappy.parquet
/content/output/students/section=10-A/part-00001-2c2e59fa-08fd-4140-a93a-2baa846bd2b8.c000.snappy.parquet
/content/output/students/section=10-A/part-00001-498ba4c0-371e-4b45-a179-a82263a78564.c000.snappy.parquet
/content/output/students/section=10-A/part-00000-2c2e59fa-08fd-4140-a93a-2baa846bd2b8.c000.snappy.parquet
/content/output/students/section=10-A/.part-00001-2c2e59fa-08fd-4140-a93a-2baa846bd2b8.c000.snappy.parquet.crc
/content/output/students/section=10-A/.part-00001-8301ea02-c4d1-4285-b978-880b0e761691.c000.snappy.parquet.crc
/content/output/students/section=10-B/.part-00000-2c2e59fa-

In [108]:
df_10A = spark.read.parquet("/content/output/students/section=10-A/")
df_10A.show()


+------+-----+
|  name|marks|
+------+-----+
|Anjali|   78|
| Tejas|   91|
| Tejas|   91|
|  Amit|   89|
+------+-----+



In [111]:
print("Count after incremental load:", df_10A.count())


Count after incremental load: 4


In [113]:
etl_csv = "emp_id,name,dept,salary,bonus\n1,Arjun,IT,75000,5000\n2,Kavya,HR,62000,\n3,Sneha,Finance,68000,4000\n4,Ramesh,Sales,58000,"
with open("/content/etl.csv", "w") as f:
    f.write(etl_csv)
df_etl = spark.read.csv("/content/etl.csv", header=True, inferSchema=True)
df_etl.show()


+------+------+-------+------+-----+
|emp_id|  name|   dept|salary|bonus|
+------+------+-------+------+-----+
|     1| Arjun|     IT| 75000| 5000|
|     2| Kavya|     HR| 62000| NULL|
|     3| Sneha|Finance| 68000| 4000|
|     4|Ramesh|  Sales| 58000| NULL|
+------+------+-------+------+-----+



In [117]:
df_etl = df_etl.fillna({"bonus": 2000})
df_etl.show()


+------+------+-------+------+-----+
|emp_id|  name|   dept|salary|bonus|
+------+------+-------+------+-----+
|     1| Arjun|     IT| 75000| 5000|
|     2| Kavya|     HR| 62000| 2000|
|     3| Sneha|Finance| 68000| 4000|
|     4|Ramesh|  Sales| 58000| 2000|
+------+------+-------+------+-----+



In [121]:
df_etl = df_etl.withColumn("total_ctc", col("salary") + col("bonus"))
df_etl.show()


+------+------+-------+------+-----+---------+
|emp_id|  name|   dept|salary|bonus|total_ctc|
+------+------+-------+------+-----+---------+
|     1| Arjun|     IT| 75000| 5000|    80000|
|     2| Kavya|     HR| 62000| 2000|    64000|
|     3| Sneha|Finance| 68000| 4000|    72000|
|     4|Ramesh|  Sales| 58000| 2000|    60000|
+------+------+-------+------+-----+---------+



In [125]:
df_filtered = df_etl.filter(col("total_ctc") > 65000)
df_filtered.show()


+------+-----+-------+------+-----+---------+
|emp_id| name|   dept|salary|bonus|total_ctc|
+------+-----+-------+------+-----+---------+
|     1|Arjun|     IT| 75000| 5000|    80000|
|     3|Sneha|Finance| 68000| 4000|    72000|
+------+-----+-------+------+-----+---------+



In [139]:
df_filtered.write.json("/content/output/etl_json/sa")
df_filtered.write.partitionBy("dept").parquet("/content/output/etl_parquet/sa")
