In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Start Spark
spark = SparkSession.builder.appName("MidMarks").getOrCreate()

# Read the CSV 
df = spark.read.option("header", True).option("inferSchema", True).csv("C:/Users/intur/Desktop/INDRAKIRAN/MRU/4-1/BDA/MIDMARKS-Spark.csv")
df.show(10, truncate=False)
df.printSchema()


+----+-------+---+----+---+----+---+----+
|S.NO|SECTION|DV |M-II|PP |BEEE|FL |FIMS|
+----+-------+---+----+---+----+---+----+
|1   |ALPHA  |12 |0   |17 |9   |19 |15  |
|2   |ALPHA  |19 |12  |16 |16  |18 |3   |
|3   |ALPHA  |18 |14  |18 |18  |18 |16  |
|4   |ALPHA  |15 |9   |19 |17  |19 |15  |
|5   |ALPHA  |18 |17  |19 |19  |20 |18  |
|6   |ALPHA  |17 |16  |18 |10  |15 |9   |
|7   |ALPHA  |15 |10  |20 |20  |15 |14  |
|8   |ALPHA  |17 |17  |19 |20  |19 |13  |
|9   |ALPHA  |10 |18  |A  |20  |19 |15  |
|10  |ALPHA  |18 |19  |20 |20  |20 |15  |
+----+-------+---+----+---+----+---+----+
only showing top 10 rows

root
 |-- S.NO: integer (nullable = true)
 |-- SECTION: string (nullable = true)
 |-- DV: string (nullable = true)
 |-- M-II: string (nullable = true)
 |-- PP: string (nullable = true)
 |-- BEEE: string (nullable = true)
 |-- FL: string (nullable = true)
 |-- FIMS: string (nullable = true)



In [None]:
null_counts = df.select([
    F.count(F.when(F.col(f"`{c}`").isNull() | (F.col(f"`{c}`") == ""), c)).alias(c)
    for c in df.columns
])
null_counts.show()


+----+-------+---+----+---+----+---+----+
|S.NO|SECTION| DV|M-II| PP|BEEE| FL|FIMS|
+----+-------+---+----+---+----+---+----+
| 117|     27|  2|   2|  2|   2|  3|   2|
+----+-------+---+----+---+----+---+----+



In [13]:
import pyspark.sql.functions as F

# Count nulls and empty strings
null_counts = df.select([
    F.count(F.when(F.col(f"`{c}`").isNull() | (F.col(f"`{c}`") == ""), c)).alias(c)
    for c in df.columns
])
null_counts.show()


+----+-------+---+----+---+----+---+----+-----+
|S.NO|SECTION| DV|M-II| PP|BEEE| FL|FIMS|total|
+----+-------+---+----+---+----+---+----+-----+
| 117|     27|  2|   2|  2|   2|  3|   2|   36|
+----+-------+---+----+---+----+---+----+-----+



In [17]:
from pyspark.sql.types import NumericType

numeric_cols = [c for c, t in df.dtypes if isinstance(df.schema[c].dataType, NumericType)]
string_cols = [c for c in df.columns if c not in numeric_cols]


In [21]:
df = df.withColumnRenamed("S.NO", "SNO")


In [23]:
numeric_cols = [c for c, t in df.dtypes if t in ("int", "double", "float", "bigint")]
string_cols = [c for c in df.columns if c not in numeric_cols]

df = df.fillna(0, subset=numeric_cols)
df = df.fillna("Unknown", subset=string_cols)

for c in string_cols:
    df = df.withColumn(c, F.when(F.col(c) == "", "Unknown").otherwise(F.col(c)))


In [25]:
from pyspark.sql.types import NumericType

numeric_cols = [c for c, t in df.dtypes if t in ("int", "double", "float", "bigint")]
string_cols = [c for c in df.columns if c not in numeric_cols]
print("Numeric columns:", numeric_cols)
print("String columns:", string_cols)


Numeric columns: ['SNO', 'total']
String columns: ['SECTION', 'DV', 'M-II', 'PP', 'BEEE', 'FL', 'FIMS']


In [None]:

subject_cols = [c for c in numeric_cols if c not in ["SNO"]]

df = df.withColumn(
    "total",
    sum(F.col(c).cast("double") for c in subject_cols)
)


In [29]:
df.select("SNO", "SECTION", *subject_cols, "total").show(10, truncate=False)


+---+-------+-----+-----+
|SNO|SECTION|total|total|
+---+-------+-----+-----+
|1  |ALPHA  |72.0 |72.0 |
|2  |ALPHA  |84.0 |84.0 |
|3  |ALPHA  |102.0|102.0|
|4  |ALPHA  |94.0 |94.0 |
|5  |ALPHA  |111.0|111.0|
|6  |ALPHA  |85.0 |85.0 |
|7  |ALPHA  |94.0 |94.0 |
|8  |ALPHA  |105.0|105.0|
|9  |ALPHA  |0.0  |0.0  |
|10 |ALPHA  |112.0|112.0|
+---+-------+-----+-----+
only showing top 10 rows



In [35]:
if "total" in df.columns:
    df = df.drop("total")


In [None]:
subject_cols = ["DV", "M-II", "PP", "BEEE", "FL", "FIMS"]

for c in subject_cols:
    df = df.withColumn(c, F.col(c).cast("int"))


In [39]:
df = df.withColumn(
    "total",
    sum(F.col(c) for c in subject_cols)
)


In [41]:
df.select("SNO", "SECTION", *subject_cols, "total").show(10, truncate=False)


+---+-------+---+----+----+----+---+----+-----+
|SNO|SECTION|DV |M-II|PP  |BEEE|FL |FIMS|total|
+---+-------+---+----+----+----+---+----+-----+
|1  |ALPHA  |12 |0   |17  |9   |19 |15  |72   |
|2  |ALPHA  |19 |12  |16  |16  |18 |3   |84   |
|3  |ALPHA  |18 |14  |18  |18  |18 |16  |102  |
|4  |ALPHA  |15 |9   |19  |17  |19 |15  |94   |
|5  |ALPHA  |18 |17  |19  |19  |20 |18  |111  |
|6  |ALPHA  |17 |16  |18  |10  |15 |9   |85   |
|7  |ALPHA  |15 |10  |20  |20  |15 |14  |94   |
|8  |ALPHA  |17 |17  |19  |20  |19 |13  |105  |
|9  |ALPHA  |10 |18  |NULL|20  |19 |15  |NULL |
|10 |ALPHA  |18 |19  |20  |20  |20 |15  |112  |
+---+-------+---+----+----+----+---+----+-----+
only showing top 10 rows



In [None]:
from pyspark.sql.functions import col, sum as pysum, when

subject_cols = ["DV", "M-II", "PP", "BEEE", "FL", "FIMS"]

# Step 1: Replace nulls with zero in each subject column
for col_name in subject_cols:
    df = df.withColumn(col_name, when(col(col_name).isNull(), 0).otherwise(col(col_name)))
\
df = df.withColumn(
    "total",
    sum([col(c) for c in subject_cols])
)

df.select("SNO", "SECTION", *subject_cols, "total").show(10, truncate=False)


+---+-------+---+----+---+----+---+----+-----+
|SNO|SECTION|DV |M-II|PP |BEEE|FL |FIMS|total|
+---+-------+---+----+---+----+---+----+-----+
|1  |ALPHA  |12 |0   |17 |9   |19 |15  |72   |
|2  |ALPHA  |19 |12  |16 |16  |18 |3   |84   |
|3  |ALPHA  |18 |14  |18 |18  |18 |16  |102  |
|4  |ALPHA  |15 |9   |19 |17  |19 |15  |94   |
|5  |ALPHA  |18 |17  |19 |19  |20 |18  |111  |
|6  |ALPHA  |17 |16  |18 |10  |15 |9   |85   |
|7  |ALPHA  |15 |10  |20 |20  |15 |14  |94   |
|8  |ALPHA  |17 |17  |19 |20  |19 |13  |105  |
|9  |ALPHA  |10 |18  |0  |20  |19 |15  |82   |
|10 |ALPHA  |18 |19  |20 |20  |20 |15  |112  |
+---+-------+---+----+---+----+---+----+-----+
only showing top 10 rows



In [None]:
N = 20
row_count = df.count()

df.orderBy("SNO", ascending=False).limit(N).orderBy("SNO").show(truncate=False)


+---+-------+---+----+---+----+---+----+-----+
|SNO|SECTION|DV |M-II|PP |BEEE|FL |FIMS|total|
+---+-------+---+----+---+----+---+----+-----+
|582|SIGMA  |15 |5   |14 |4   |13 |16  |67   |
|583|SIGMA  |20 |16  |15 |18  |15 |18  |102  |
|584|SIGMA  |19 |20  |18 |17  |20 |15  |109  |
|585|SIGMA  |18 |15  |14 |13  |19 |14  |93   |
|586|SIGMA  |19 |7   |11 |12  |15 |13  |77   |
|587|SIGMA  |20 |17  |20 |15  |18 |18  |108  |
|588|SIGMA  |19 |19  |12 |8   |13 |19  |90   |
|589|SIGMA  |20 |20  |20 |20  |20 |18  |118  |
|590|SIGMA  |20 |17  |20 |18  |20 |18  |113  |
|591|SIGMA  |20 |15  |17 |13  |20 |17  |102  |
|592|SIGMA  |17 |20  |16 |16  |20 |19  |108  |
|593|SIGMA  |20 |18  |19 |14  |19 |16  |106  |
|594|SIGMA  |15 |6   |9  |3   |18 |15  |66   |
|595|SIGMA  |20 |17  |18 |20  |19 |20  |114  |
|596|SIGMA  |17 |14  |16 |18  |20 |18  |103  |
|597|SIGMA  |20 |20  |20 |20  |20 |20  |120  |
|598|SIGMA  |20 |20  |20 |19  |19 |18  |116  |
|599|SIGMA  |20 |20  |17 |17  |19 |18  |111  |
|600|SIGMA  |

In [None]:

df_filtered = df.filter((col("SNO") >= 100) & (col("SNO") <= 200))
df_filtered.show(truncate=False)


+---+-------+---+----+---+----+---+----+-----+------------------------------------+
|SNO|SECTION|DV |M-II|PP |BEEE|FL |FIMS|total|zero_subjects                       |
+---+-------+---+----+---+----+---+----+-----+------------------------------------+
|100|BETA   |9  |11  |9  |10  |13 |12  |64   |[NULL, NULL, NULL, NULL, NULL, NULL]|
|101|BETA   |10 |10  |10 |4   |15 |12  |61   |[NULL, NULL, NULL, NULL, NULL, NULL]|
|102|BETA   |13 |12  |18 |4   |18 |11  |76   |[NULL, NULL, NULL, NULL, NULL, NULL]|
|103|BETA   |10 |12  |17 |9   |0  |16  |64   |[NULL, NULL, NULL, NULL, FL, NULL]  |
|104|BETA   |11 |18  |20 |15  |20 |18  |102  |[NULL, NULL, NULL, NULL, NULL, NULL]|
|105|BETA   |17 |20  |19 |12  |20 |18  |106  |[NULL, NULL, NULL, NULL, NULL, NULL]|
|106|BETA   |12 |11  |18 |11  |15 |17  |84   |[NULL, NULL, NULL, NULL, NULL, NULL]|
|107|BETA   |9  |13  |17 |6   |18 |15  |78   |[NULL, NULL, NULL, NULL, NULL, NULL]|
|108|BETA   |11 |5   |19 |9   |18 |9   |71   |[NULL, NULL, NULL, NULL, NULL,

In [None]:
from pyspark.sql.functions import col, array, when, explode

subject_cols = ["DV", "M-II", "PP", "BEEE", "FL", "FIMS"]

# Array where subject names where the score is zero
zero_subjects = [when(col(c) == 0, c).otherwise(None) for c in subject_cols]
df = df.withColumn("zero_subjects", array(*zero_subjects))

#array so each row lists only one zero subject
zero_rows = df.select("SNO", explode("zero_subjects").alias("Subject"), "total")
result = zero_rows.filter(col("Subject").isNotNull())

result.show(truncate=False)


+---+-------+-----+
|SNO|Subject|total|
+---+-------+-----+
|1  |M-II   |72   |
|9  |PP     |82   |
|73 |M-II   |61   |
|83 |M-II   |4    |
|83 |BEEE   |4    |
|83 |FL     |4    |
|83 |FIMS   |4    |
|89 |PP     |39   |
|103|FL     |64   |
|140|M-II   |56   |
|145|M-II   |69   |
|157|PP     |55   |
|157|BEEE   |55   |
|161|M-II   |41   |
|170|M-II   |42   |
|177|M-II   |59   |
|185|M-II   |40   |
|188|M-II   |46   |
|191|M-II   |22   |
|193|M-II   |49   |
+---+-------+-----+
only showing top 20 rows



In [57]:
df.groupBy("SECTION").count().show()


+-------+-----+
|SECTION|count|
+-------+-----+
|   ZETA|   90|
|   BETA|   90|
|  OMEGA|   90|
|EPSILON|   88|
|Unknown|   27|
|  SIGMA|   63|
|  ALPHA|   90|
|  GAMMA|   90|
|  DELTA|   90|
+-------+-----+



In [61]:
df.groupBy("SECTION").count().show()


+-------+-----+
|SECTION|count|
+-------+-----+
|   ZETA|   90|
|   BETA|   90|
|  OMEGA|   90|
|EPSILON|   88|
|Unknown|   27|
|  SIGMA|   63|
|  ALPHA|   90|
|  GAMMA|   90|
|  DELTA|   90|
+-------+-----+



In [None]:
from pyspark.sql import Row
from pyspark.sql.functions import col

counts_df = df.groupBy("SECTION").count()
counts_dict = {row['SECTION']: row['count'] for row in counts_df.collect() if row['SECTION'] is not None}

unknown_count = counts_dict.get("Unknown", 0)
sigma_count = counts_dict.get("SIGMA", 0)

new_sigma_count = sigma_count + unknown_count
df_filtered = df.filter(col("SECTION") != "Unknown")

new_sigma_row = Row(SECTION="SIGMA", count=new_sigma_count)

filtered_counts_df = df_filtered.groupBy("SECTION").count()

filtered_counts_no_sigma = filtered_counts_df.filter(col("SECTION") != "SIGMA")

updated_counts_df = filtered_counts_no_sigma.union(df.sparkSession.createDataFrame([new_sigma_row]))

updated_counts_df.show()


+-------+-----+
|SECTION|count|
+-------+-----+
|   ZETA|   90|
|   BETA|   90|
|  OMEGA|   90|
|EPSILON|   88|
|  ALPHA|   90|
|  GAMMA|   90|
|  DELTA|   90|
|  SIGMA|   90|
+-------+-----+



In [67]:
df.groupBy("DV").count().show()

+---+-----+
| DV|count|
+---+-----+
| 12|   41|
|  1|    3|
| 13|   30|
| 16|   74|
|  6|   12|
|  3|    1|
| 20|  103|
|  5|   11|
| 19|   60|
| 15|   63|
| 17|   79|
|  9|   20|
|  4|    4|
|  8|   11|
|  7|    8|
| 10|   26|
| 11|   43|
| 14|   41|
|  2|    6|
|  0|   13|
+---+-----+
only showing top 20 rows



In [69]:
df = df.withColumn(
"Grade",
when(df.total >= 80, "A")
.when(df.total >= 60, "B")
.when(df.total >= 40, "C")
.otherwise("F")
)
df.show()

+---+-------+---+----+---+----+---+----+-----+--------------------+-----+
|SNO|SECTION| DV|M-II| PP|BEEE| FL|FIMS|total|       zero_subjects|Grade|
+---+-------+---+----+---+----+---+----+-----+--------------------+-----+
|  1|  ALPHA| 12|   0| 17|   9| 19|  15|   72|[NULL, M-II, NULL...|    B|
|  2|  ALPHA| 19|  12| 16|  16| 18|   3|   84|[NULL, NULL, NULL...|    A|
|  3|  ALPHA| 18|  14| 18|  18| 18|  16|  102|[NULL, NULL, NULL...|    A|
|  4|  ALPHA| 15|   9| 19|  17| 19|  15|   94|[NULL, NULL, NULL...|    A|
|  5|  ALPHA| 18|  17| 19|  19| 20|  18|  111|[NULL, NULL, NULL...|    A|
|  6|  ALPHA| 17|  16| 18|  10| 15|   9|   85|[NULL, NULL, NULL...|    A|
|  7|  ALPHA| 15|  10| 20|  20| 15|  14|   94|[NULL, NULL, NULL...|    A|
|  8|  ALPHA| 17|  17| 19|  20| 19|  13|  105|[NULL, NULL, NULL...|    A|
|  9|  ALPHA| 10|  18|  0|  20| 19|  15|   82|[NULL, NULL, PP, ...|    A|
| 10|  ALPHA| 18|  19| 20|  20| 20|  15|  112|[NULL, NULL, NULL...|    A|
| 11|  ALPHA| 17|  18| 20|  18| 20|  1

In [73]:
df = df.withColumnRenamed("S.NO", "S_NO")


In [79]:
df = df.drop("TotalMarks120", "TotalMarks100")


In [81]:
df = df.withColumn(
"Grade",
when(df.total >= 80, "A")
.when(df.total >= 60, "B")
.when(df.total >= 40, "C")
.otherwise("F")
)
df.show()

+---+-------+---+----+---+----+---+----+-----+--------------------+-----+
|SNO|SECTION| DV|M-II| PP|BEEE| FL|FIMS|total|       zero_subjects|Grade|
+---+-------+---+----+---+----+---+----+-----+--------------------+-----+
|  1|  ALPHA| 12|   0| 17|   9| 19|  15|   72|[NULL, M-II, NULL...|    B|
|  2|  ALPHA| 19|  12| 16|  16| 18|   3|   84|[NULL, NULL, NULL...|    A|
|  3|  ALPHA| 18|  14| 18|  18| 18|  16|  102|[NULL, NULL, NULL...|    A|
|  4|  ALPHA| 15|   9| 19|  17| 19|  15|   94|[NULL, NULL, NULL...|    A|
|  5|  ALPHA| 18|  17| 19|  19| 20|  18|  111|[NULL, NULL, NULL...|    A|
|  6|  ALPHA| 17|  16| 18|  10| 15|   9|   85|[NULL, NULL, NULL...|    A|
|  7|  ALPHA| 15|  10| 20|  20| 15|  14|   94|[NULL, NULL, NULL...|    A|
|  8|  ALPHA| 17|  17| 19|  20| 19|  13|  105|[NULL, NULL, NULL...|    A|
|  9|  ALPHA| 10|  18|  0|  20| 19|  15|   82|[NULL, NULL, PP, ...|    A|
| 10|  ALPHA| 18|  19| 20|  20| 20|  15|  112|[NULL, NULL, NULL...|    A|
| 11|  ALPHA| 17|  18| 20|  18| 20|  1

In [83]:
df = df.drop("Grade")


In [85]:
df.show()

+---+-------+---+----+---+----+---+----+-----+--------------------+
|SNO|SECTION| DV|M-II| PP|BEEE| FL|FIMS|total|       zero_subjects|
+---+-------+---+----+---+----+---+----+-----+--------------------+
|  1|  ALPHA| 12|   0| 17|   9| 19|  15|   72|[NULL, M-II, NULL...|
|  2|  ALPHA| 19|  12| 16|  16| 18|   3|   84|[NULL, NULL, NULL...|
|  3|  ALPHA| 18|  14| 18|  18| 18|  16|  102|[NULL, NULL, NULL...|
|  4|  ALPHA| 15|   9| 19|  17| 19|  15|   94|[NULL, NULL, NULL...|
|  5|  ALPHA| 18|  17| 19|  19| 20|  18|  111|[NULL, NULL, NULL...|
|  6|  ALPHA| 17|  16| 18|  10| 15|   9|   85|[NULL, NULL, NULL...|
|  7|  ALPHA| 15|  10| 20|  20| 15|  14|   94|[NULL, NULL, NULL...|
|  8|  ALPHA| 17|  17| 19|  20| 19|  13|  105|[NULL, NULL, NULL...|
|  9|  ALPHA| 10|  18|  0|  20| 19|  15|   82|[NULL, NULL, PP, ...|
| 10|  ALPHA| 18|  19| 20|  20| 20|  15|  112|[NULL, NULL, NULL...|
| 11|  ALPHA| 17|  18| 20|  18| 20|  18|  111|[NULL, NULL, NULL...|
| 12|  ALPHA| 20|  20| 20|  20| 19|  16|  115|[N

In [87]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "Grade",
    when(col("total") >= 96, "A")  # 80% of 120 = 96
    .when(col("total") >= 72, "B")  # 60% of 120 = 72
    .when(col("total") >= 48, "C")  # 40% of 120 = 48
    .otherwise("F")
)

df.show()


+---+-------+---+----+---+----+---+----+-----+--------------------+-----+
|SNO|SECTION| DV|M-II| PP|BEEE| FL|FIMS|total|       zero_subjects|Grade|
+---+-------+---+----+---+----+---+----+-----+--------------------+-----+
|  1|  ALPHA| 12|   0| 17|   9| 19|  15|   72|[NULL, M-II, NULL...|    B|
|  2|  ALPHA| 19|  12| 16|  16| 18|   3|   84|[NULL, NULL, NULL...|    B|
|  3|  ALPHA| 18|  14| 18|  18| 18|  16|  102|[NULL, NULL, NULL...|    A|
|  4|  ALPHA| 15|   9| 19|  17| 19|  15|   94|[NULL, NULL, NULL...|    B|
|  5|  ALPHA| 18|  17| 19|  19| 20|  18|  111|[NULL, NULL, NULL...|    A|
|  6|  ALPHA| 17|  16| 18|  10| 15|   9|   85|[NULL, NULL, NULL...|    B|
|  7|  ALPHA| 15|  10| 20|  20| 15|  14|   94|[NULL, NULL, NULL...|    B|
|  8|  ALPHA| 17|  17| 19|  20| 19|  13|  105|[NULL, NULL, NULL...|    A|
|  9|  ALPHA| 10|  18|  0|  20| 19|  15|   82|[NULL, NULL, PP, ...|    B|
| 10|  ALPHA| 18|  19| 20|  20| 20|  15|  112|[NULL, NULL, NULL...|    A|
| 11|  ALPHA| 17|  18| 20|  18| 20|  1