In [0]:
# Use Lahman database
spark.sql("USE lahman")

print("Current DB:", spark.catalog.currentDatabase())

In [0]:
from pyspark.sql import functions as F

# Read source
bat = spark.table("lahman.batting")

# Aggregate per team & season
team_year = (
    bat.groupBy("teamID", "yearID")
       .agg(
           F.sum(F.col("H")).alias("H"),
           F.sum(F.col("BB")).alias("BB"),
           F.sum(F.col("HBP")).alias("HBP"),
           F.sum(F.col("AB")).alias("AB"),
           F.sum(F.col("2B")).alias("Doubles"),
           F.sum(F.col("3B")).alias("Triples"),
           F.sum(F.col("HR")).alias("HR"),
           F.sum(F.col("SF")).alias("SF")
       )
       .fillna(0, subset=["H","BB","HBP","AB","Doubles","Triples","HR","SF"])
       .withColumn("Singles", F.col("H") - F.col("Doubles") - F.col("Triples") - F.col("HR"))
       .withColumn("TB",      F.col("Singles") + 2*F.col("Doubles") + 3*F.col("Triples") + 4*F.col("HR"))
       .withColumn("OBP_den", F.col("AB") + F.col("BB") + F.col("HBP") + F.col("SF"))
       .withColumn("SLG_den", F.col("AB"))
       .withColumn("OBP",
           F.when(F.col("OBP_den") != 0,
                  (F.col("H") + F.col("BB") + F.col("HBP")) / F.col("OBP_den"))
            .otherwise(F.lit(None))
       )
       .withColumn("SLG",
           F.when(F.col("SLG_den") != 0,
                  F.col("TB") / F.col("SLG_den"))
            .otherwise(F.lit(None))
       )
       .withColumn("OPS", F.col("OBP") + F.col("SLG"))
       .select("yearID","teamID","AB","H","BB","HBP","SF","Doubles","Triples","HR","Singles","TB","OBP","SLG","OPS")
)

# Write as Delta and register the table
delta_path = "/FileStore/lahman/delta/team_year_ops"

(team_year.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(delta_path)
)

spark.sql(f"""
  CREATE TABLE IF NOT EXISTS lahman.team_year_ops
  USING DELTA
  LOCATION '{delta_path}'
""")

print("Wrote lahman.team_year_ops")
display(spark.table("lahman.team_year_ops").orderBy(F.desc("OPS")).limit(20))

In [0]:
from pyspark.sql import functions as F

spark.sql("USE lahman")

# Join team names to the gold table on (teamID, yearID)
ops = spark.table("lahman.team_year_ops").alias("o")
teams = spark.table("lahman.teams").select("yearID","teamID","name").alias("t")

ops_named = (
    ops.join(teams, ["yearID","teamID"], "left")
       .withColumnRenamed("name","teamName")
       .select("yearID","teamID","teamName","AB","H","BB","HBP","SF",
               "Doubles","Triples","HR","Singles","TB","OBP","SLG","OPS")
)

# Save a friendly view for quick queries
ops_named.createOrReplaceTempView("vw_team_year_ops_named")

print("Created temp view: vw_team_year_ops_named")
display(spark.sql("""
  SELECT yearID, teamID, teamName, OPS, OBP, SLG, HR, AB
  FROM vw_team_year_ops_named
  ORDER BY OPS DESC
  LIMIT 20
"""))

In [0]:
%sql
USE lahman;

-- Example: 1998 team OPS leaderboard
SELECT yearID, teamID, teamName, OPS, OBP, SLG, HR, AB
FROM vw_team_year_ops_named
WHERE yearID = 2024
ORDER BY OPS DESC;