In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [11]:
spark = SparkSession.builder.appName("HogwartsAnalysis").getOrCreate()

data = [
    ("Gryffindor", 1, 80, "Harry Potter"),
    ("Slytherin", 1, 60, "Draco Malfoy"),
    ("Ravenclaw", 1, 45, "Luna Lovegood"),
    ("Hufflepuff", 1, 30, "Cedric Diggory"),
    ("Gryffindor", 2, 90, "Hermione Granger"),
    ("Slytherin", 2, 70, "Pansy Parkinson"),
    ("Ravenclaw", 2, 55, "Cho Chang"),
    ("Hufflepuff", 2, 65, "Hannah Abbott"),
    ("Gryffindor", 3, 20, "Ron Weasley"),
    ("Slytherin", 3, 85, "Blaise Zabini")
]

columns = ["house", "year", "points", "student"]
df = spark.createDataFrame(data, columns)

res_df = ( df.select("house", "year", "points").filter(col("points")>50)
            .groupBy("house", "year")
            .agg(sum("points").alias("total_p"))
            .orderBy(desc("year"), desc("total_p"))
)
res_df.show()

+----------+----+-------+
|     house|year|total_p|
+----------+----+-------+
| Slytherin|   3|     85|
|Gryffindor|   2|     90|
| Slytherin|   2|     70|
|Hufflepuff|   2|     65|
| Ravenclaw|   2|     55|
|Gryffindor|   1|     80|
| Slytherin|   1|     60|
+----------+----+-------+



In [17]:
data = [
    ("Aragorn", "Human", 10, 2, "Helms Deep"),
    ("Legolas", "Elf", 15, 0, "Helms Deep"),
    ("Gimli", "Dwarf", 8, 3, "Helms Deep"),
    ("Frodo", "Hobbit", 2, 1, "Moria"),
    ("Sam", "Hobbit", 4, 2, "Moria"),
    ("Gandalf", "Wizard", 12, 1, "Moria"),
    ("Boromir", "Human", 7, 4, "Amon Hen"),
    ("Legolas", "Elf", 20, 0, "Amon Hen"),
    ("Aragorn", "Human", 9, 2, "Amon Hen")
]

columns = ["name", "race", "enemies_defeated", "injuries", "battle"]
df = spark.createDataFrame(data, columns)

res_df = (
    df.select("name","race","enemies_defeated").filter(col("enemies_defeated")>5)
    .groupBy("race").agg(round(avg("enemies_defeated")).alias("avg_enemie_def"))
    .orderBy(desc("avg_enemie_def"))
)
res_df.show()

+------+--------------+
|  race|avg_enemie_def|
+------+--------------+
|   Elf|          18.0|
|Wizard|          12.0|
| Human|           9.0|
| Dwarf|           8.0|
+------+--------------+



In [16]:
data = [
    ("SUB-01", "Pacific Strike", 5, "Success"),
    ("SUB-02", "Atlantic Surge", 2, "Failure"),
    ("SUB-01", "Arctic Blitz", 4, "Success"),
    ("SUB-03", "Indian Ocean", 6, "Success"),
    ("SUB-02", "Pacific Strike", 3, "Success"),
    ("SUB-01", "Coral Sea", 7, "Success"),
    ("SUB-03", "Arctic Blitz", 1, "Failure"),
    ("SUB-02", "Bering Strait", 5, "Success")
]

columns = ["submarine_id", "mission_name", "warheads_launched", "status"]
df = spark.createDataFrame(data, columns)
# df.show()
res_df = (
    df.select("submarine_id","mission_name","warheads_launched")
    .filter((col("warheads_launched")>3) & (col("status")=="Success"))
    .groupBy("submarine_id").agg(
        count("mission_name"),sum("warheads_launched").alias("total_warheads")
    ).orderBy(desc("total_warheads"))
)
res_df.show()

+------------+-------------------+--------------+
|submarine_id|count(mission_name)|total_warheads|
+------------+-------------------+--------------+
|      SUB-01|                  3|            16|
|      SUB-03|                  1|             6|
|      SUB-02|                  1|             5|
+------------+-------------------+--------------+

