In [None]:
# Databricks notebook source
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import requests
import json
from datetime import datetime

# Initialize Spark Session with Delta support
spark = SparkSession.builder \
    .appName("ESPNCollegeFootballStreaming") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("shortName", StringType(), True),
    StructField("status", StringType(), True),  # Status is string, not boolean
    StructField("date", TimestampType(), True),
    StructField("homeTeam", StringType(), True),
    StructField("homeScore", IntegerType(), True),
    StructField("awayTeam", StringType(), True),
    StructField("awayScore", IntegerType(), True)
])

# Function to fetch data from ESPN API
def fetch_espn_data():
    try:
        url = "https://site.api.espn.com/apis/site/v2/sports/football/college-football/scoreboard"
        response = requests.get(url, timeout=10)
        response.raise_for_status()  # Raise an exception for bad status codes
        data = response.json()
        
        matches = []
        for event in data.get('events', []):
            try:
                match = {
                    'id': event['id'],
                    'name': event['name'],
                    'shortName': event['shortName'],
                    'status': event['status']['type']['name'],  # Treat status as string
                    'date': datetime.strptime(event['date'], "%Y-%m-%dT%H:%MZ"),
                    'homeTeam': event['competitions'][0]['competitors'][0]['team']['displayName'],
                    'homeScore': int(event['competitions'][0]['competitors'][0]['score']),
                    'awayTeam': event['competitions'][0]['competitors'][1]['team']['displayName'],
                    'awayScore': int(event['competitions'][0]['competitors'][1]['score'])
                }
                matches.append(match)
            except (KeyError, IndexError, ValueError) as e:
                print(f"Error processing event: {e}")
                continue
        
        return matches
    except requests.RequestException as e:
        print(f"Error fetching data from ESPN API: {e}")
        return []

# Function to process each batch of streaming data
def process_batch(df, epoch_id):
    try:
        # Fetch ESPN data
        espn_data = fetch_espn_data()
        
        if not espn_data:
            print(f"No data to process for batch {epoch_id}")
            return
        
        # Convert the data to a Spark DataFrame
        spark_df = spark.createDataFrame(espn_data, schema)
        
        # Write the DataFrame to a Delta table
        delta_table_name = "college_football_matches"
        spark_df.write.format("delta").mode("overwrite").saveAsTable(delta_table_name)
        
        print(f"Batch {epoch_id} processed. Data written to SQL table {delta_table_name}")
    except Exception as e:
        print(f"Error processing batch {epoch_id}: {e}")

# Create a streaming DataFrame
streaming_df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 1) \
    .load()

# Start the streaming query
query = streaming_df.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("update") \
    .trigger(processingTime='1 minutes') \
    .start()

# Wait for the streaming to finish
query.awaitTermination()

