In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("StudentGrades") \
    .getOrCreate()

scores = [
    ("Alice", {"Math": 85, "Science": 90, "English": 80}),
    ("Bob", {"Math": 70, "Science": 75, "English": 85}),
    ("Charlie", {"Math": 60, "Science": 65, "English": 70}),
    ("David", {"Math": 90, "Science": 95, "English": 85}),
    ("Eve", {"Math": 75, "Science": 80, "English": 75})
]

scores_rdd = spark.sparkContext.parallelize(scores)


grading_scheme = {
    "A": (80, 100),
    "B": (60, 79),
    "C": (40, 59),
    "D": (0, 39)
}


def compute_grade(score):
    for grade, (lower_bound, upper_bound) in grading_scheme.items():
        if lower_bound <= score <= upper_bound:
            return grade
    return "F"

grades_rdd = scores_rdd.map(lambda x: (x[0], {subject: compute_grade(score) for subject, score in x[1].items()}))

grades_df = spark.createDataFrame(grades_rdd.flatMap(lambda x: [(x[0], subject, grade) for subject, grade in x[1].items()]), ["Student", "Subject", "Grade"])

grades_df.show()

spark.stop()


+-------+-------+-----+
|Student|Subject|Grade|
+-------+-------+-----+
|  Alice|   Math|    A|
|  Alice|Science|    A|
|  Alice|English|    A|
|    Bob|   Math|    B|
|    Bob|Science|    B|
|    Bob|English|    A|
|Charlie|   Math|    B|
|Charlie|Science|    B|
|Charlie|English|    B|
|  David|   Math|    A|
|  David|Science|    A|
|  David|English|    A|
|    Eve|   Math|    B|
|    Eve|Science|    A|
|    Eve|English|    B|
+-------+-------+-----+

