Bronze Team & Position Extraction

In [12]:
# %% Cell 0: Pipeline mode (full vs. incremental)
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Pipeline_Mode").getOrCreate()

# Try to pull from spark.conf; defaults to "incremental" if not set
mode = spark.conf.get("pipeline.parameters.mode", "incremental").strip().lower()

# ───► Uncomment one of these lines to force “full” or “incremental” locally:
# mode = "full"
mode = "incremental"

print(f"▶️ Pipeline running in {mode!r} mode")

StatementMeta(, 912a7d70-7b9e-40db-834e-458fa50668d2, 14, Finished, Available, Finished)

▶️ Pipeline running in 'full' mode


In [13]:
# %% Cell 1: Build team_ids based on mode
import requests
from pyspark.sql.functions import col, max as spark_max

bronze = spark.table("SASPProd.bronze_teams")
max_id = bronze.select(spark_max(col("TeamID").cast("int"))).first()[0] or 0

if mode == "full":
    team_ids = list(range(1, max_id + 1))
else:
    start_id = max(1, max_id - 100)
    team_ids = list(range(start_id, max_id + 1))

# Detect truly new IDs above max_id
new_ids = []
curr = max_id + 1
while True:
    r = requests.get(f"https://virtual.sssfonline.com/api/shot/sasp-team/{curr}")
    if r.status_code == 404:
        break
    r.raise_for_status()
    new_ids.append(curr)
    curr += 1

team_ids += new_ids

print(f"▶️ Refreshing TeamIDs ({mode}): {team_ids[:3]} ... {team_ids[-3:]} (total {len(team_ids)})")


StatementMeta(, 912a7d70-7b9e-40db-834e-458fa50668d2, 15, Finished, Available, Finished)

▶️ Refreshing TeamIDs (full): [1, 2, 3] ... [3837, 3838, 3839] (total 3839)


In [14]:
# %% Cell Y: Fetch teams and build a Spark DataFrame (explicit schema)

import requests
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StructType, StructField, StringType

# ------------------------------------------------------------
# 1️⃣ Re‐use the existing SparkSession (created earlier in your notebook).
spark = SparkSession.builder.getOrCreate()
spark.sql("CREATE DATABASE IF NOT EXISTS SASPProd")
# ------------------------------------------------------------

# 2️⃣ Loop over `team_ids` (defined in Cell 1) and fetch each team.
rows = []
for tid in team_ids:
    url = f"https://virtual.sssfonline.com/api/shot/sasp-team/{tid}"
    try:
        r = requests.get(url)
        if r.status_code == 200:
            data = r.json()
            # Flatten exactly the fields your bronze_teams schema expects:
            flat = {
                "TeamID":         str(data.get("id", "")),
                "TeamName":       data.get("name", ""),
                "Location":       data.get("location", ""),
                "PrimaryContact": data.get("contact", ""),
                # …add any other nested keys here if your schema has more columns…
                "SourceURL":      url,                    # ← Track which URL succeeded
            }
            rows.append(flat)
        else:
            print(f"⚠️  Team {tid} returned status {r.status_code} (URL: {url})")
    except Exception as e:
        print(f"⚠️  Error fetching Team {tid} from {url}: {e}")

# 3️⃣ Define an explicit Spark schema that matches these columns:
schema = StructType([
    StructField("TeamID",         StringType(), nullable=True),
    StructField("TeamName",       StringType(), nullable=True),
    StructField("Location",       StringType(), nullable=True),
    StructField("PrimaryContact", StringType(), nullable=True),
    StructField("SourceURL",      StringType(), nullable=True),
])

# 4️⃣ Build a pandas DataFrame with exactly these column names (in this order).
#    Even if `rows == []`, pandas will create an empty DataFrame with the correct columns.
columns = ["TeamID", "TeamName", "Location", "PrimaryContact", "SourceURL"]
pdf = pd.DataFrame(rows, columns=columns)

# 5️⃣ Convert pandas → Spark using the explicit schema.
#    If `rows` was empty, this still works (Spark doesn’t try to infer from data).
if pdf.shape[0] == 0:
    # No rows: create an empty DataFrame with the schema
    spark_df = spark.createDataFrame([], schema=schema)
else:
    # At least 1 row: convert using the same schema
    spark_df = spark.createDataFrame(pdf, schema=schema)

# 6️⃣ Add an ingest timestamp column
spark_df = spark_df.withColumn("ingest_ts", current_timestamp())

# 7️⃣ Show exactly which TeamID + SourceURL rows are about to be upserted
print("🚀  The following teams (by SourceURL) will be upserted into bronze_teams:")
spark_df.select("TeamID", "SourceURL").show(truncate=False)

# 9️⃣ Sanity check: print schema & sample rows
spark_df.printSchema()
spark_df.show(10, truncate=False)

StatementMeta(, 912a7d70-7b9e-40db-834e-458fa50668d2, 16, Finished, Available, Finished)

⚠️  Team 1 returned status 404 (URL: https://virtual.sssfonline.com/api/shot/sasp-team/1)
⚠️  Team 2 returned status 404 (URL: https://virtual.sssfonline.com/api/shot/sasp-team/2)
⚠️  Team 3 returned status 404 (URL: https://virtual.sssfonline.com/api/shot/sasp-team/3)
⚠️  Team 4 returned status 404 (URL: https://virtual.sssfonline.com/api/shot/sasp-team/4)
⚠️  Team 5 returned status 404 (URL: https://virtual.sssfonline.com/api/shot/sasp-team/5)
⚠️  Team 6 returned status 404 (URL: https://virtual.sssfonline.com/api/shot/sasp-team/6)
⚠️  Team 7 returned status 404 (URL: https://virtual.sssfonline.com/api/shot/sasp-team/7)
⚠️  Team 8 returned status 404 (URL: https://virtual.sssfonline.com/api/shot/sasp-team/8)
⚠️  Team 9 returned status 404 (URL: https://virtual.sssfonline.com/api/shot/sasp-team/9)
⚠️  Team 10 returned status 404 (URL: https://virtual.sssfonline.com/api/shot/sasp-team/10)
⚠️  Team 11 returned status 404 (URL: https://virtual.sssfonline.com/api/shot/sasp-team/11)
⚠️  Te

In [15]:
# %% Cell Z: Upsert (merge) these back into bronze_teams

from delta.tables import DeltaTable
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

# 1️⃣ Check if spark_df is empty. If so, skip the merge.
#    This prevents errors when there are no new teams to insert/update.
row_count = spark_df.count()
if row_count == 0:
    print("ℹ️ spark_df is empty. No new teams to upsert.")
else:
    # 2️⃣ Narrow spark_df down to only the columns in bronze_teams.
    #    Here we assume the target schema for SASPProd.bronze_teams is:
    #      TeamID, TeamName, Location, PrimaryContact, ingest_ts
    #    (If your bronze_teams has additional columns, include them here as needed.)
    upsert_df = spark_df.select(
        col("TeamID"),
        col("TeamName"),
        col("Location"),
        col("PrimaryContact"),
        col("ingest_ts")
    )

    # 3️⃣ Load the existing bronze_teams Delta table
    bronze = DeltaTable.forName(spark, "SASPProd.bronze_teams")

    # 4️⃣ Perform the merge using only matching columns.
    #    WhenMatchedUpdateAll/WhenNotMatchedInsertAll will now only
    #    try to update/insert the five columns defined above.
    (
        bronze.alias("b")
              .merge(
                  upsert_df.alias("u"),
                  "b.TeamID = u.TeamID"      # key‐column for matching
              )
              .whenMatchedUpdateAll()
              .whenNotMatchedInsertAll()
              .execute()
    )

    print(f"✅ Upserted {row_count} teams into bronze_teams")


StatementMeta(, 912a7d70-7b9e-40db-834e-458fa50668d2, 17, Finished, Available, Finished)

ℹ️ spark_df is empty. No new teams to upsert.
