In [0]:
%fs ls /FileStore/tables/HockeyTimeline/

path,name,size,modificationTime
dbfs:/FileStore/tables/HockeyTimeline/training_1910.csv,training_1910.csv,10947,1715898065000
dbfs:/FileStore/tables/HockeyTimeline/training_1920.csv,training_1920.csv,179545,1715898065000
dbfs:/FileStore/tables/HockeyTimeline/training_1930.csv,training_1930.csv,334747,1715898063000
dbfs:/FileStore/tables/HockeyTimeline/training_1940.csv,training_1940.csv,279062,1715898064000
dbfs:/FileStore/tables/HockeyTimeline/training_1950.csv,training_1950.csv,348686,1715898063000
dbfs:/FileStore/tables/HockeyTimeline/training_1960.csv,training_1960.csv,429525,1715898063000
dbfs:/FileStore/tables/HockeyTimeline/training_1970.csv,training_1970.csv,1043636,1715898064000
dbfs:/FileStore/tables/HockeyTimeline/training_1980.csv,training_1980.csv,1406180,1715898062000
dbfs:/FileStore/tables/HockeyTimeline/training_1990.csv,training_1990.csv,1607022,1715898063000
dbfs:/FileStore/tables/HockeyTimeline/training_2000.csv,training_2000.csv,1809227,1715898065000


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from time import sleep

inputPath = "/FileStore/tables/HockeyTimeline/"


# Define the schema to speed up processing
jsonSchema = StructType([ 
    StructField("season", IntegerType(), True),
    StructField("date", TimestampType(), True),
    StructField("home_team_abbr", StringType(), True),
    StructField("away_team_abbr", StringType(), True),
    StructField("home_team_pregame_rating", FloatType(), True),
    StructField("away_team_pregame_rating", FloatType(), True),
    StructField("home_team_winprob", FloatType(), True),
    StructField("away_team_winprob", FloatType(), True),
    StructField("overtime_prob", FloatType(), True),
    StructField("home_team_expected_points", FloatType(), True),
    StructField("away_team_expected_points", FloatType(), True),
    StructField("home_team_won", IntegerType(),True),
    StructField("decade", IntegerType(), True)
])



streamingInputDF = (
  spark
    .readStream
    .option("maxFilesPerTrigger", 1)
    .option("header", "true")
    .schema(jsonSchema)
    .format("csv")
    .load(inputPath)
)

In [0]:
query = (
    streamingInputDF
    .writeStream
    .format("memory")
    .queryName("complete")
    .trigger(processingTime='10 seconds')
    .start()
)

In [0]:
%sql select count(*) from complete

count(1)
17142


In [0]:
# media y la desviación estándar de la puntuación esperada del equipo local

In [0]:
%sql select avg(home_team_expected_points) as avg_home_expected_points, stddev(home_team_expected_points) as stddev_home_expected_points from complete

avg_home_expected_points,stddev_home_expected_points
1.2351535209668112,0.1851969009322317


In [0]:
#la fecha más antigua y más reciente en tu conjunto de datos

In [0]:
%sql SELECT MIN(date) AS min_date, MAX(date) AS max_date FROM complete;

min_date,max_date
1929-11-14T00:00:00.000+0000,2022-06-26T00:00:00.000+0000


In [0]:
# la diferencia promedio entre las probabilidades de ganar del equipo local y del equipo visitante

In [0]:
%sql SELECT season,
       AVG(home_team_expected_points - away_team_expected_points) AS avg_score_difference
FROM complete
GROUP BY season
ORDER BY season;


season,avg_score_difference
1930,0.2446814857680222
1931,0.2444339972936501
1932,0.2500882424959322
1933,0.2528003046522925
1934,0.2535800342952018
1935,0.2458361725619786
1936,0.2556600269762057
1937,0.2566583430367793
1938,0.2583941957698418
1939,0.2599753347483087


Databricks visualization. Run in Databricks to view.

In [0]:
# la proporción de juegos que tienen una probabilidad de tiempo extra mayor a cierto umbral

In [0]:
%sql SELECT AVG(CASE WHEN overtime_prob > 0.22 THEN 1 ELSE 0 END) AS overtime_prob_ratio
FROM complete;


overtime_prob_ratio
0.8599370547006236


In [0]:
# calcular el porcentaje de juegos ganados por el equipo local en cada década

In [0]:
%sql SELECT decade,
       AVG(CASE WHEN home_team_won = true THEN 1 ELSE 0 END) AS win_percentage
FROM complete
GROUP BY decade;

decade,win_percentage
1980,0.5262121046295283
1930,0.5222988505747126
2020,0.5332398316970547
1950,0.4988962472406181
1960,0.5295592977427446
1990,0.5018192263500575
2010,0.5460675919391516
1970,0.5271466509294778
1940,0.5455046883618312
1910,0.7285714285714285


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [0]:
# Paso 4: Preprocesar los datos en streaming
label_indexer = StringIndexer(inputCol="home_team_won", outputCol="label")
feature_columns = ["home_team_pregame_rating", "away_team_pregame_rating", "home_team_winprob", 
                   "away_team_winprob", "overtime_prob", "home_team_expected_points", 
                   "away_team_expected_points"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Crear un pipeline de preprocesamiento
preprocessing_pipeline = Pipeline(stages=[label_indexer, assembler])

# Paso 5: Entrenar y actualizar el modelo en tiempo real
def train_and_evaluate_model(microbatch_df, batch_id):
    # Preprocesar el microbatch
    preprocessed_data = preprocessing_pipeline.fit(microbatch_df).transform(microbatch_df)
    
    # Dividir los datos en conjuntos de entrenamiento y prueba
    train_data, test_data = preprocessed_data.randomSplit([0.8, 0.2], seed=1234)
    
    # Construir y entrenar el modelo
    lr = LogisticRegression(featuresCol="features", labelCol="label")
    lr_model = lr.fit(train_data)
    
    # Evaluar el modelo
    predictions = lr_model.transform(test_data)
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
    roc_auc = evaluator.evaluate(predictions)
    print(f"Area under ROC for batch {batch_id}: {roc_auc}")


    # Multiclass Classification Evaluator para precisión, recall y F1-score
    multi_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")
    
    accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
    precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
    recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
    f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})

    print(f"Accuracy for batch {batch_id}: {accuracy}")
    print(f"Precision for batch {batch_id}: {precision}")
    print(f"Recall for batch {batch_id}: {recall}")
    print(f"F1-score for batch {batch_id}: {f1}")
    
    # Acceder a los parámetros del modelo
    print("Model parameters:")
    print(f"Coefficients: {lr_model.coefficients}")
    print(f"Intercept: {lr_model.intercept}")

# Aplicar la función de entrenamiento en cada microbatch
streaming_query = streamingInputDF.writeStream.foreachBatch(train_and_evaluate_model).start()

# Esperar a que la transmisión termine
streaming_query.awaitTermination()


Area under ROC for batch 0: 0.6353853752141523
Accuracy for batch 0: 0.6047671840354767
Precision for batch 0: 0.6039844432910839
Recall for batch 0: 0.6047671840354767
F1-score for batch 0: 0.6031505464224859
Model parameters:
Coefficients: [-0.0020178724478151612,0.001591696762640823,-8.389015554547072,8.38681508770717,6.480894061581912,3.9565221827605694,-3.6948711828597935]
Intercept: -1.0360210883428154
Area under ROC for batch 1: 0.6200465574357594
Accuracy for batch 1: 0.5901639344262295
Precision for batch 1: 0.5885382219148065
Recall for batch 1: 0.5901639344262295
F1-score for batch 1: 0.588972596083005
Model parameters:
Coefficients: [-0.005326446030418907,0.004448778384070917,-3.348293725681086,3.3451871769345973,11.845774284890236,1.3959111081445763,-1.6350059470519593]
Intercept: -1.2223061970536993
Area under ROC for batch 2: 0.6284243140226908
Accuracy for batch 2: 0.5754716981132075
Precision for batch 2: 0.5745458645882745
Recall for batch 2: 0.5754716981132075
F1-sco