In [7]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("MySparkApp").getOrCreate()

filepath = "/content/st_scores.csv"

#read the raw CSV file - a spark DataFrame
raw_stdata = spark.read.format("csv") \
                      .option("inferSchema", "true") \
                      .option("header", "true") \
                      .load(filepath) \
                      .withColumnRenamed("Class Score","ClassScore") \
                      .withColumnRenamed("Test Score","TestScore")

#checking schema - ensuring whether everything is gone good or not
raw_stdata.printSchema()

#checking data
raw_stdata.show(5)

root
 |-- Student: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- ClassScore: integer (nullable = true)
 |-- TestScore: double (nullable = true)

+-------+---------+----------+---------+
|Student|  Subject|ClassScore|TestScore|
+-------+---------+----------+---------+
|  James|     Math|        95|   65.175|
|  James|Chemistry|        50|    32.45|
|  James|  Physics|        48|   37.675|
|  James|  Biology|        75|   76.725|
|   Lora|     Math|        45|   49.225|
+-------+---------+----------+---------+
only showing top 5 rows



In [11]:
fileout = "/content/partitioned_st" # Changed to a local path

raw_stdata.write \
          .format("parquet") \
          .mode("overwrite") \
          .option("compression", "gzip") \
          .partitionBy("Subject") \
          .save(fileout)

In [13]:
st_data = spark.read.parquet(fileout)

print("# of partitions in dataset : " + str(st_data.rdd.getNumPartitions()))

# of partitions in dataset : 2


In [15]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F # Import functions module

# ... your existing code ...

tot_score = st_data.withColumn("TotScore",
                        F.col("ClassScore") # Use F.col to refer to columns
                        + F.col("TestScore"))

tot_score.show(5)

print("--------Explain--------")
tot_score.explain() # Call explain as a function
print("--------End of Explain--------")
print("# of partitions in dataset : " + str(tot_score.rdd.getNumPartitions())) # Convert to string

+-------+----------+---------+-------+--------+
|Student|ClassScore|TestScore|Subject|TotScore|
+-------+----------+---------+-------+--------+
|  James|        95|   65.175|   Math| 160.175|
|   Lora|        45|   49.225|   Math|  94.225|
|   Leny|        36|   65.175|   Math| 101.175|
|   Lisa|        33|    78.65|   Math|  111.65|
|  Elvis|        27|     33.0|   Math|    60.0|
+-------+----------+---------+-------+--------+
only showing top 5 rows

--------Explain--------
== Physical Plan ==
*(1) Project [Student#74, ClassScore#75, TestScore#76, Subject#77, (cast(ClassScore#75 as double) + TestScore#76) AS TotScore#82]
+- *(1) ColumnarToRow
   +- FileScan parquet [Student#74,ClassScore#75,TestScore#76,Subject#77] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/partitioned_st], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Student:string,ClassScore:int,TestScore:double>


--------End of Explain--------
# of partitions

In [19]:
physics_score = tot_score.filter(tot_score["Subject"] == "Physics")

physics_score.show()

print("--------Explain--------")
physics_score.explain
print("--------End of Explain--------")
print("# of partitions in dataset : " , physics_score.rdd.getNumPartitions)

+-------+----------+---------+-------+---------+
|Student|ClassScore|TestScore|Subject| TotScore|
+-------+----------+---------+-------+---------+
|  James|        48|   37.675|Physics|   85.675|
|   Lora|        34|     74.8|Physics|    108.8|
|   Leny|        93|   79.475|Physics|  172.475|
|   Lisa|        42|    64.35|Physics|   106.35|
|  Elvis|        82|     77.0|Physics|    159.0|
|Micheal|        48| 31.68125|Physics| 79.68125|
| Daniel|        34|     62.9|Physics|     96.9|
|   Dave|        93| 66.83125|Physics|159.83125|
|   Roby|        42|  54.1125|Physics|  96.1125|
| Pamela|        82|    64.75|Physics|   146.75|
+-------+----------+---------+-------+---------+

--------Explain--------
--------End of Explain--------
# of partitions in dataset :  <bound method RDD.getNumPartitions of MapPartitionsRDD[55] at javaToPython at NativeMethodAccessorImpl.java:0>


In [23]:
avgscore = tot_score.groupBy("Student").avg("TotScore")

avgscore.show()

print("--------Explain--------")
avgscore.explain
print("--------End of Explain--------")
print("# of partitions in dataset : " , avgscore.rdd.getNumPartitions)

+-------+------------------+
|Student|     avg(TotScore)|
+-------+------------------+
|  James|         120.00625|
|   Dave|       121.0359375|
|  Elvis|113.60624999999999|
| Pamela|       103.2484375|
|   Lora| 99.23124999999999|
|   Roby|         89.478125|
|   Leny|         131.30625|
| Daniel| 90.40468750000001|
|Micheal|115.45625000000001|
|   Lisa| 97.98750000000001|
+-------+------------------+

--------Explain--------
--------End of Explain--------
# of partitions in dataset :  <bound method RDD.getNumPartitions of MapPartitionsRDD[78] at javaToPython at NativeMethodAccessorImpl.java:0>


In [36]:
from pyspark.sql.functions import col, max

# finding top score
top = tot_score.groupBy("Subject").agg(max("TotScore").alias("Tops"))
top.show()

# sorting out the students with top scores
top_student = tot_score.alias("t1").join(top.alias("t2"),
                                         (col("t2.Tops") == col("t1.TotScore")) & (col("t2.Subject") == col("t1.Subject"))) \
    .select("t1.Subject", "t1.Student", "t2.Tops")
top_student.show()

print("--------Explain--------")
top_student.explain()
print("--------End of Explain--------")
print("# of partitions in dataset:", top_student.rdd.getNumPartitions())


+---------+-------+
|  Subject|   Tops|
+---------+-------+
|     Math|160.175|
|  Biology|151.725|
|Chemistry| 120.65|
|  Physics|172.475|
+---------+-------+

+---------+-------+-------+
|  Subject|Student|   Tops|
+---------+-------+-------+
|     Math|  James|160.175|
|     Math|Micheal|160.175|
|  Biology|  James|151.725|
|  Physics|   Leny|172.475|
|Chemistry|   Leny| 120.65|
+---------+-------+-------+

--------Explain--------
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Subject#77, Student#74, Tops#214]
   +- BroadcastHashJoin [knownfloatingpointnormalized(normalizenanandzero(TotScore#82)), Subject#77], [knownfloatingpointnormalized(normalizenanandzero(Tops#214)), Subject#233], Inner, BuildRight, false
      :- Project [Student#74, Subject#77, (cast(ClassScore#75 as double) + TestScore#76) AS TotScore#82]
      :  +- Filter isnotnull((cast(ClassScore#75 as double) + TestScore#76))
      :     +- FileScan parquet [Student#74,ClassScore#75,TestScore#76,Subj