# PYSPARK - ASSESSMENT - O1

**1. PySpark Setup & Initialization**

In [5]:
# 1.1
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("BotCampus Intermediate Session") \
.master("local[*]") \
.getOrCreate()


In [6]:
# 1.2
data = [("Ananya", "Bangalore", 24),
        ("Ravi", "Hyderabad", 28),
        ("Kavya", "Delhi", 22),
        ("Meena", "Chennai", 25)]

columns = ["name", "city", "age"]
df = spark.createDataFrame(data, columns)
df.show()


+------+---------+---+
|  name|     city|age|
+------+---------+---+
|Ananya|Bangalore| 24|
|  Ravi|Hyderabad| 28|
| Kavya|    Delhi| 22|
| Meena|  Chennai| 25|
+------+---------+---+



**2. RDDs & Transformations**

In [77]:
from os import linesep
# 2.1
feedback = spark.sparkContext.parallelize([
"Ravi from Bangalore loved the mobile app",
"Meena from Delhi reported poor response time",
"Ajay from Pune liked the delivery speed",
"Ananya from Hyderabad had an issue with UI",
"Rohit from Mumbai gave positive feedback"
])

# 2.1.1
words=feedback.flatMap(lambda line:line.lower().split())
total_words=words.count()
print("Total word count:",total_words)

# 2.1.2
word_counts=words.map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
top_3=word_counts.takeOrdered(3,key=lambda x:-x[1])
print("\nTop 3 common words:\n",top_3)

# 2.1.3
stopwords = {"from", "with", "the", "and", "an", "had", "was", "of"}
filtered_words=words.filter(lambda word:word.lower() not in stopwords)
filtered_counts=filtered_words.map(lambda word:(word,1)) \
              .reduceByKey(lambda a,b:a+b)
print("\nStop Words removed successfully")

# 2.1.4
word_count_dict=dict(filtered_counts.collect())
print("\nWord-Count Dictionary:\n",word_count_dict)

Total word count: 35

Top 3 common words:
 [('from', 5), ('the', 2), ('loved', 1)]

Stop Words removed successfully

Word-Count Dictionary:
 {'loved': 1, 'app': 1, 'poor': 1, 'response': 1, 'liked': 1, 'speed': 1, 'ananya': 1, 'issue': 1, 'rohit': 1, 'mumbai': 1, 'positive': 1, 'feedback': 1, 'ravi': 1, 'bangalore': 1, 'mobile': 1, 'meena': 1, 'delhi': 1, 'reported': 1, 'time': 1, 'ajay': 1, 'pune': 1, 'delivery': 1, 'hyderabad': 1, 'ui': 1, 'gave': 1}


**3. DataFrames - Transformations**


In [20]:
# 3.1
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

scores = [
("Ravi", "Math", 88),
("Ananya", "Science", 92),
("Kavya", "English", 79),
("Ravi", "English", 67),
("Neha", "Math", 94),
("Meena", "Science", 85)
]

columns = ["name","subject","score"]
df_scores =  spark.createDataFrame(scores, columns)

# 3.1.1
df_scores = df_scores.withColumn('grade', when(col('score') >= 90, "A")\
                                 .when(col('score') >= 80, "B")\
                                 .when(col('score') >= 70,"C")\
                                 .otherwise("D"))
print("Added new Grade column:\n")
df_scores.show()

# 3.1.2
print("Average Subject Scores:\n")
df_scores.groupBy('subject').agg(avg('score').alias("average_score")).show()

# 3.1.3
df_scores = df_scores.withColumn('difficulty',
                    when(col('subject').isin('Math','Science'), "Difficult")\
                    .otherwise("Easy"))
print("Added new Difficulty column:\n")
df_scores.show()

# 3.1.4
window = Window.partitionBy('subject').orderBy(col('score').desc())
df_scores = df_scores.withColumn('rank', rank().over(window))

print("Students Rank:\n")
df_scores.show()

# 3.1.5
# My user derfined funtion: Upper Case conversion
def to_upper(word):
  if word is not None:
    return word.upper()
  else: return None

uppercase_udf = udf(to_upper, StringType())

df_scores = df_scores.withColumn('name', uppercase_udf(col('name')))
print("Uppercase Name:\n")
df_scores.show()


Added new Grade column:

+------+-------+-----+-----+
|  name|subject|score|grade|
+------+-------+-----+-----+
|  Ravi|   Math|   88|    B|
|Ananya|Science|   92|    A|
| Kavya|English|   79|    C|
|  Ravi|English|   67|    D|
|  Neha|   Math|   94|    A|
| Meena|Science|   85|    B|
+------+-------+-----+-----+

Average Subject Scores:

+-------+-------------+
|subject|average_score|
+-------+-------------+
|Science|         88.5|
|   Math|         91.0|
|English|         73.0|
+-------+-------------+

Added new Difficulty column:

+------+-------+-----+-----+----------+
|  name|subject|score|grade|difficulty|
+------+-------+-----+-----+----------+
|  Ravi|   Math|   88|    B| Difficult|
|Ananya|Science|   92|    A| Difficult|
| Kavya|English|   79|    C|      Easy|
|  Ravi|English|   67|    D|      Easy|
|  Neha|   Math|   94|    A| Difficult|
| Meena|Science|   85|    B| Difficult|
+------+-------+-----+-----+----------+

Students Rank:

+------+-------+-----+-----+----------+----

**4. Ingest CSV & JSON - Save to Parquet**

In [33]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [36]:
studPath = "/content/drive/MyDrive/Hexware_Training_DataEngineering/Aug05-Day12/Assessment01/students.csv"
empPath = "/content/drive/MyDrive/Hexware_Training_DataEngineering/Aug05-Day12/Assessment01/employee_nested.json"

studDF = spark.read.csv(studPath, header = True, inferSchema = True)
empDF = spark.read.option("multiline", "true").json(empPath)

print("Student Schema:\n")
studDF.printSchema()
studDF.show()

print("Employee Schema:\n")
empDF.printSchema()
empDF.show(truncate = False)

Student Schema:

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = true)

+---+-----+----------+---------+------+
| id| name|department|     city|salary|
+---+-----+----------+---------+------+
|  1| Amit|        IT|Bangalore| 78000|
|  2|Kavya|        HR|  Chennai| 62000|
|  3|Arjun|   Finance|Hyderabad| 55000|
+---+-----+----------+---------+------+

Employee Schema:

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pincode: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+---+-----+---------------+
|address         |id |name |skills         |
+----------------+---+-----+---------------+
|{Mumbai, 400001}|101|Sneha|[Python, Spark]|
+----------------+---+-----+-

In [38]:
# 4.3
flattened_emp = empDF.select(
    col('id'),
    col('name'),
    col('address.city').alias('city'),
    col('address.pincode').alias('pincode'),
    explode(col('skills')).alias('skill')
)

print("Flattened Employee Json:\n")
flattened_emp.printSchema()
flattened_emp.show()

Flattened Employee Json:

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- pincode: long (nullable = true)
 |-- skill: string (nullable = true)

+---+-----+------+-------+------+
| id| name|  city|pincode| skill|
+---+-----+------+-------+------+
|101|Sneha|Mumbai| 400001|Python|
|101|Sneha|Mumbai| 400001| Spark|
+---+-----+------+-------+------+



In [39]:
# 4.4
studOutPath = ("/content/drive/MyDrive/Hexware_Training_DataEngineering/"
              "Aug05-Day12/Assessment01/Output/students_parqueet.csv"
              )
empOutPath = ("/content/drive/MyDrive/Hexware_Training_DataEngineering/"
              "Aug05-Day12/Assessment01/Output/employees_parqueet.csv"
              )

studDF.coalesce(1).write.mode("overwrite").parquet(studOutPath)
flattened_emp.coalesce(1).write.mode("overwrite").parquet(empOutPath)
print('Files Saved successfully')


Files Saved successfully


**5. SPARK SQL - Temp Views & Queries**

In [50]:
# 5.1
# a
df_scores.createOrReplaceTempView("exam_scores")

print("Exam Scores:\n")
spark.sql("SELECT * FROM exam_scores").show()

print("\nTop Scorers per subject:\n")
spark.sql("""SELECT * From exam_scores WHERE rank = 1""").show()

# b
print("\nStudent Count per Grade:\n")
spark.sql("""
SELECT grade, COUNT(name) AS student_count
FROM exam_scores
GROUP BY grade
ORDER BY grade""").show()

# c
print("\nStudents with more than one subject:\n")
spark.sql("""
SELECT name, Count(subject) AS subject_count
FROM exam_scores
GROUP BY name
HAVING subject_count > 1
""").show()


# d
print("\nAverage Score per Subject:\n")
spark.sql("""
SELECT subject, AVG(score) AS average_score
FROM exam_scores
GROUP BY subject
HAVING average_score > 85
""").show()

Exam Scores:

+------+-------+-----+-----+----------+----+
|  name|subject|score|grade|difficulty|rank|
+------+-------+-----+-----+----------+----+
| KAVYA|English|   79|    C|      Easy|   1|
|  RAVI|English|   67|    D|      Easy|   2|
|  NEHA|   Math|   94|    A| Difficult|   1|
|  RAVI|   Math|   88|    B| Difficult|   2|
|ANANYA|Science|   92|    A| Difficult|   1|
| MEENA|Science|   85|    B| Difficult|   2|
+------+-------+-----+-----+----------+----+


Top Scorers per subject:

+------+-------+-----+-----+----------+----+
|  name|subject|score|grade|difficulty|rank|
+------+-------+-----+-----+----------+----+
| KAVYA|English|   79|    C|      Easy|   1|
|  NEHA|   Math|   94|    A| Difficult|   1|
|ANANYA|Science|   92|    A| Difficult|   1|
+------+-------+-----+-----+----------+----+


Student Count per Grade:

+-----+-------------+
|grade|student_count|
+-----+-------------+
|    A|            2|
|    B|            2|
|    C|            1|
|    D|            1|
+-----+----

In [55]:
# 5.2
attendance = [
    ("KAVYA", 21),
    ("RAVI", 22),
    ("NEHA", 25),
    ("ANANYA", 18),
    ("MEENA", 15)
]

columns = ["name","days_present"]
df_attendance = spark.createDataFrame(attendance, columns)

df_attendance.createOrReplaceTempView("attendance")

# a
print("Joined data:\n")
spark.sql("""
SELECT e.*, a.days_present
FROM exam_scores e
LEFT JOIN attendance a
ON e.name = a.name
""").show()

# b
print("\nUpdated Grades:\n")
spark.sql("""
SELECT e.name, e.subject, e.score, a.days_present, e.grade,
  CASE
    WHEN a.days_present < 20 AND e.grade = 'A' THEN 'B'
    WHEN a.days_present < 20 AND e.grade = 'B' THEN 'C'
    WHEN a.days_present < 20 AND e.grade = 'C' THEN 'D'
    WHEN a.days_present < 20 AND e.grade = 'D' THEN 'E'
    ELSE e.grade
  END AS updated_grade
FROM exam_scores e
LEFT JOIN attendance a
ON e.name = a.name
""").show()


Joined data:

+------+-------+-----+-----+----------+----+------------+
|  name|subject|score|grade|difficulty|rank|days_present|
+------+-------+-----+-----+----------+----+------------+
|  NEHA|   Math|   94|    A| Difficult|   1|          25|
| MEENA|Science|   85|    B| Difficult|   2|          15|
|  RAVI|English|   67|    D|      Easy|   2|          22|
|  RAVI|   Math|   88|    B| Difficult|   2|          22|
|ANANYA|Science|   92|    A| Difficult|   1|          18|
| KAVYA|English|   79|    C|      Easy|   1|          21|
+------+-------+-----+-----+----------+----+------------+


Updated Grades:

+------+-------+-----+------------+-----+-------------+
|  name|subject|score|days_present|grade|updated_grade|
+------+-------+-----+------------+-----+-------------+
|  RAVI|   Math|   88|          22|    B|            B|
|ANANYA|Science|   92|          18|    A|            B|
| KAVYA|English|   79|          21|    C|            C|
|  NEHA|   Math|   94|          25|    A|          

**6. Partitioned Load (Full + Incremental)**

In [62]:
# Initial load
outPath = "/tmp/scores/"
df_scores.write.partitionBy("subject").mode("overwrite").parquet(outPath)

# Incremental load
incremental = [("Meena", "Math", 93)]
columns = ["name", "subject", "score"]
df_inc = spark.createDataFrame(incremental, columns)
df_inc.write.mode("append").partitionBy("subject").parquet("/tmp/scores/")

df_inc.printSchema()

root
 |-- name: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- score: long (nullable = true)



In [63]:
# a
import os
print("Folders in /tmp/scores/:\n")
print(os.listdir("/tmp/scores/"))

# b
math_df = spark.read.parquet("/tmp/scores/subject=Math")
print("\nMath Scores:\n")
math_df.show()



Folders in /tmp/scores/:

['._SUCCESS.crc', 'subject=Science', 'subject=English', '_SUCCESS', 'subject=Math']

Math Scores:

+-----+-----+-----+----------+----+
| name|score|grade|difficulty|rank|
+-----+-----+-----+----------+----+
| NEHA|   94|    A| Difficult|   1|
| RAVI|   88|    B| Difficult|   2|
|Meena|   93| NULL|      NULL|NULL|
+-----+-----+-----+----------+----+



**7. ETL: Clean, Transform, Load**

In [69]:
# a
empPath = ("/content/drive/MyDrive/Hexware_Training_DataEngineering/"
          "Aug05-Day12/Assessment01/employee.csv"
          )
df_employee = spark.read.csv(empPath, header = True, inferSchema= True)
df_employee.show()

+------+-----+-------+------+-----+
|emp_id| name|   dept|salary|bonus|
+------+-----+-------+------+-----+
|     1|Arjun|     IT| 78000| 5000|
|     2|Kavya|     HR| 62000| NULL|
|     3|Sneha|Finance| 55000| 3000|
+------+-----+-------+------+-----+



In [70]:
# b
df_employee = df_employee.fillna({"bonus": 2000})

In [71]:
# c
df_employee = df_employee.withColumn("total_ctc", col('salary')+col('bonus'))
df_employee.show()

+------+-----+-------+------+-----+---------+
|emp_id| name|   dept|salary|bonus|total_ctc|
+------+-----+-------+------+-----+---------+
|     1|Arjun|     IT| 78000| 5000|    83000|
|     2|Kavya|     HR| 62000| 2000|    64000|
|     3|Sneha|Finance| 55000| 3000|    58000|
+------+-----+-------+------+-----+---------+



In [72]:
# d
df_employee.filter(col('total_ctc') > 60000).show()

+------+-----+----+------+-----+---------+
|emp_id| name|dept|salary|bonus|total_ctc|
+------+-----+----+------+-----+---------+
|     1|Arjun|  IT| 78000| 5000|    83000|
|     2|Kavya|  HR| 62000| 2000|    64000|
+------+-----+----+------+-----+---------+



In [74]:
# e
empOutPath1 = ("/content/drive/MyDrive/Hexware_Training_DataEngineering/"
              "Aug05-Day12/Assessment01/Output1/employee.parquet"
              )
empOutPath2 = ("/content/drive/MyDrive/Hexware_Training_DataEngineering/"
              "Aug05-Day12/Assessment01/Output2/employee.json"
              )

df_employee.write.mode("overwrite").parquet(empOutPath1)
df_employee.write.mode("overwrite").json(empOutPath2)
print("Files saved successsfully")

Files saved successsfully
