In [0]:
%run "../includes/configurations"

#Drivers df

In [0]:
drivers = spark.read.format("delta").load(f"{processed_folder_path}/drivers").drop('number')

In [0]:
drivers = drivers.drop('name')

##Results df

In [0]:
driver_result = spark.read.format("delta").load(f"{processed_folder_path}/results").drop("ingestion_date" , "data_source", 'number')

In [0]:
display(driver_result)

##Races

In [0]:
races = spark.read.format("delta").load(f"{processed_folder_path}/races").drop("ingestion_date" , "data_source")

In [0]:
from pyspark.sql.functions import to_date, year

races_date = races.select('date', 'circuit_id', 'race_id')

# Eliminating 2022 and 2023
races_date = races_date.filter((year(races_date['date']) != 2023) & (year(races_date['date']) != 2022))

##Circuits

In [0]:
circuits = spark.read.format("delta").load(f"{processed_folder_path}/circuits")
    #.drop("ingestion_date" , "data_source")\


##Driver Standings

In [0]:
from pyspark.sql.functions import to_date, year

driver_standings = spark.read.format("delta").load(f"{processed_folder_path}/driver_standings")
    #.drop("ingestion_date" , "data_source", 'driverStandings_id')\


##Lap times

In [0]:
lap_times = spark.read.format("delta").load(f"{processed_folder_path}/lap_times").withColumnRenamed('miliseconds', 'milliseconds')
lap_times = lap_times[['race_id', 'driver_id', 'lap', 'milliseconds']]


##Qualifying

In [0]:
qualifying = spark.read.format("delta").load(f"{processed_folder_path}/qualifying")
qualifying = qualifying[['driver_id', 'position']]

In [0]:
import pandas as pd
import numpy as np

#visualization tools
import matplotlib.pyplot as plt

import seaborn as sns
from datetime import datetime
sns.set()

from os import listdir
from os.path import isfile, join

In [0]:
#Merge Dataframes

from pyspark.sql.functions import col

driver_result_df = driver_result
races_df = races
drivers_df = drivers

df1 = driver_result_df.join(races_df, on='race_id').dropDuplicates()
df_race = df1.join(drivers_df, on='driver_id').dropDuplicates()
display(df_race)

In [0]:
#Drop posterior data column 

df_race = df_race.drop('laps', 'milliseconds', 'fastest_lap', 'fastest_lap_time', 'fastest_lap_speed', 'time_x', 'time_y', 'position_order' ,'ingestion_date', 'data_source')

In [0]:
df_race = df_race.drop('position', 'nationality', 'name', 'position', 'position_text', 'race_timestamp', 'driver_ref' ,'time')

In [0]:
df_race= df_race.withColumnRenamed('race_year', 'year')

In [0]:
display(df_race)

##Add age

In [0]:
from pyspark.sql.functions import datediff, current_date, round, year


#Add age column to dataframe
df_race = df_race.withColumn('age', round(datediff(current_date(), 'dob') / 365))

display(df_race)

##Winning a race

In [0]:
from pyspark.sql.functions import  lit
df_driver= drivers
#df_driver = df_driver.withColumn('totalWins', lit(0))

In [0]:

races_date_w= races_date.select('race_id', 'date')
driver_standings = driver_standings.join(races_date, on='race_id', how='left').dropDuplicates()

display(driver_standings)

In [0]:
# Convert the "date" column to a datetime object
from pyspark.sql.functions import to_timestamp, year, lit

#driver_standings = driver_standings.withColumn('date', lit(0))
driver_standings= driver_standings.withColumn("date",to_timestamp(driver_standings.date))

In [0]:
#count the number of races each driven in
from pyspark.sql.functions import countDistinct

num_races_per_driver_df = driver_standings.groupBy('driver_id') \
                                           .agg(countDistinct('race_id').alias('totalRaces'))


In [0]:
from pyspark.sql.functions import year, max, col, count


df_driver_final = df_driver.join(driver_standings, on='driver_id', how='left')


In [0]:

# Group the dataframe by year and find the maximum date for each year
from pyspark.sql.functions import year, max

latest_day_in_year = df_driver_final.groupby((df_driver_final['driver_id']),
 df_driver_final['date']).agg({'date': 'max'})


In [0]:
from pyspark.sql.functions import year, max, sum, when

# Extract year from 'date' column
df_driver_fl = df_driver_final.withColumn('year', year('date'))

# Get the latest date for each year
latest_day_in_year = df_driver_fl.groupby('year').agg(max('date').alias('latest_date')).collect()

# Convert latest_day_in_year to list of dates
latest_day_in_year = [row.latest_date for row in latest_day_in_year]

# Filter dataframe to include only rows with latest dates for each year
filtered_dataframe = df_driver_fl.filter(col('date').isin(latest_day_in_year))

# Calculate total wins for each driver
total_wins_per_driver = filtered_dataframe.groupby('driver_id').agg(sum('wins').alias('totalWins'))

# Join total_wins_per_driver with df_driver on driverId column
df_driver = df_driver.join(total_wins_per_driver, on='driver_id', how='left_outer')


In [0]:
# adding total races to each driver
df_driver = df_driver.join(num_races_per_driver_df,on='driver_id', how="left")


In [0]:
df_driver = df_driver.withColumn('winRate', col('totalWins') / col('totalRaces')) \
                     .withColumn('age', 2023 - year('dob'))


In [0]:
display(df_driver)


In [0]:
df_driver= df_driver.drop('ingestion_date', 'data_source', 'totalWins')

###fastestLapRate - Likelihood of winning fastest lap

In [0]:
from pyspark.sql.functions import col, min

# Group the DataFrame by 'raceId' and find the row with the minimum 'milliseconds'

idx = lap_times.groupby('race_id').agg(min('milliseconds').alias('min_milliseconds')).select('race_id', 'min_milliseconds')
#display(idx)

In [0]:

df_min_milliseconds = lap_times.join(idx, (lap_times.race_id == idx.race_id) & (lap_times.milliseconds == idx.min_milliseconds )).select(lap_times['*'])

# Sort the result by 'raceId'
df_min_milliseconds = df_min_milliseconds.sort('race_id')

# Calculate the number of fastest laps for each 'driverId'
counts = df_min_milliseconds.groupby('driver_id').count().select(col('driver_id'), col('count').alias('totalFastestLaps'))

# Add the 'totalFastestLaps' column to 'df_driver'
df_driver = df_driver.join(counts, on='driver_id', how='left').fillna(0)

# Calculate the 'fastestLapRate'
df_driver = df_driver.withColumn('fastestLapRate', col('totalFastestLaps') / col('totalRaces')).drop('totalFastestLaps')

# Print the current DataFrame
df_driver.show()


+---------+-------------+----+--------+-------------+----------+-------------+----------+--------------------+---+--------------------+
|driver_id|   driver_ref|code|forename|      surname|       dob|  nationality|totalRaces|             winRate|age|      fastestLapRate|
+---------+-------------+----+--------+-------------+----------+-------------+----------+--------------------+---+--------------------+
|      148|       foitek|  \N|  Gregor|       Foitek|1965-03-27|        Swiss|        32|                 0.0| 58|                 0.0|
|      463|  chamberlain|  \N|     Jay|  Chamberlain|1925-12-29|     American|         5|                 0.0| 98|                 0.0|
|      471|    johnstone|  \N|   Bruce|    Johnstone|1937-01-30|South African|         1|                 0.0| 86|                 0.0|
|      496|   menditeguy|  \N|  Carlos|   Menditeguy|1914-08-10|    Argentine|        62|                 0.0|109|                 0.0|
|      833|        merhi| MER| Roberto|        M

In [0]:
#display(df_driver)

##fastest Qualifying

In [0]:
from pyspark.sql.functions import col, count

# COUNTING THE NUMBER OF QUALIFYING WINS
# Filter for position == 1 and count occurrences grouped by driverId
position_1_counts = qualifying.filter(col('position') == 1).groupby('driver_id').agg(count('position').alias('position_1_count'))

# Merge with df_driver using a left join and fill in missing values with 0
df_driver = df_driver.join(position_1_counts, on='driver_id', how='left').fillna({'position_1_count': 0})

# Calculate qualifyingWinRate and drop 'position_1_count'
df_driver = df_driver.withColumn('qualifyingWinRate', col('position_1_count') / col('totalRaces')).drop('position_1_count')

#display(df_driver)


#Exploratory Data Analysis

In [0]:
print("Number of data point for race dataframe: " + str(df_race.count()))
print("Number of data point for driver dataframe: " + str(df_driver.count()))


Number of data point for race dataframe: 25749
Number of data point for driver dataframe: 857


In [0]:
print("Data frame race information")
df_race.printSchema()

print('\n\n')
print("Data frame driver information")
df_driver.printSchema()


Data frame race information
root
 |-- driver_id: integer (nullable = true)
 |-- race_id: integer (nullable = true)
 |-- result_id: integer (nullable = true)
 |-- constructor_id: integer (nullable = true)
 |-- grid: integer (nullable = true)
 |-- points: float (nullable = true)
 |-- rank: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- round: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- circuit_id: integer (nullable = true)
 |-- code: string (nullable = true)
 |-- forename: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- age: double (nullable = true)




Data frame driver information
root
 |-- driver_id: integer (nullable = true)
 |-- driver_ref: string (nullable = true)
 |-- code: string (nullable = true)
 |-- forename: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- nationality: string (nullable = true)
 |-- totalRaces: long (nullable = tr

In [0]:
display(df_race)

##plotting the distributions of the level variables

In [0]:
import matplotlib.pyplot as plt
import pandas as pd

# Convert the PySpark dataframe to a Pandas dataframe
df_numeric = df_race.select('grid', 'points', 'year', 'round', 'age').toPandas()

# Create a subplot with 9 rows and 3 columns
fig, axes = plt.subplots(9, 3, figsize=(20, 24))

# Loop through each subplot and plot the corresponding data
for i, ax in enumerate(axes.flat):
    if i < len(df_numeric.columns):
        ax.hist(df_numeric.iloc[:, i], bins=20)
        ax.set_xlabel(df_numeric.columns[i])
        ax.set_ylabel('Frequency')
    else:
        ax.axis('off')

plt.tight_layout()
plt.show()


#2

In [0]:
from pyspark.sql.functions import col
import matplotlib.pyplot as plt
import seaborn as sns

# Select the required columns
df_cat = df_race.select('grid', 'points', 'age', 'rank')

# Plot the distributions of the level variables
fig, axes = plt.subplots(2, 2, figsize=(20, 20))
sns.histplot(ax=axes[0, 0], data=df_cat.toPandas(), x='grid', bins=20, kde=True)
sns.histplot(ax=axes[0, 1], data=df_cat.toPandas(), x='points', bins=20, kde=True)
sns.histplot(ax=axes[1, 0], data=df_cat.toPandas(), x='age', bins=20, kde=True)
sns.histplot(ax=axes[1, 1], data=df_cat.toPandas(), x='rank', bins=20, kde=True)
plt.show()


##Format the layout so that no overlapping between titles and graphs

In [0]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

# Convert the PySpark dataframes to Pandas dataframes
df_numeric = df_driver.select('totalRaces', 'winRate', 'fastestLapRate', 'qualifyingWinRate').toPandas()
df_numeric_1 = df_driver.select('totalRaces', 'winRate', 'fastestLapRate', 'qualifyingWinRate').toPandas()

# Create a subplot with 8 rows and 3 columns
fig, axes = plt.subplots(8, 3, figsize=(20, 24))

# Loop through each subplot and plot the corresponding data
count = 0
for i in range(len(df_numeric.columns)):
    var = df_numeric.columns[i]
    sns.histplot(data=df_numeric[var], ax=axes[count, 0])
    sns.boxplot(data=df_numeric[var], orient="h", ax=axes[count, 1])
    sns.violinplot(data=df_numeric[var], orient="h", ax=axes[count, 2])
    count += 1

for i in range(len(df_numeric_1.columns)):
    var = df_numeric_1.columns[i]
    sns.histplot(data=df_numeric_1[var], ax=axes[count, 0])
    sns.boxplot(data=df_numeric_1[var], orient="h", ax=axes[count, 1])
    sns.violinplot(data=df_numeric_1[var], orient="h", ax=axes[count, 2])
    count += 1

plt.tight_layout()
plt.show()


## Plotting the correlation matrix

In [0]:
# Import necessary libraries
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt


# Select the required columns
df_corr = df_race.select('grid', 'points', 'age', 'rank')

# Calculate the correlation matrix
corr_matrix = np.corrcoef(df_corr.toPandas().values.T)
corr = pd.DataFrame(data=corr_matrix, columns=['grid', 'points', 'age', 'rank'], index=['grid', 'points', 'age', 'rank'])

# Plot the correlation matrix
sns.set(style='white')
mask = np.zeros_like(corr, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True
f, ax = plt.subplots(figsize=(7, 5))
cmap = sns.diverging_palette(220, 10, as_cmap=True)
sns.heatmap(corr, mask=mask, cmap=cmap, vmax=.3, center=0, square=True, linewidths=.5, cbar_kws={"shrink": .5})
plt.show()


##Categorical Value

In [0]:
#df_race.describe().display()

In [0]:
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

df_race_p = df_race.na.drop()
# Convert the Spark DataFrame to a Pandas DataFrame
df_race_p = df_race_p.toPandas()

# Select the required columns
df_cat = df_race_p[['grid', 'points', 'age', 'rank']]

# Convert the selected columns to the correct data types if necessary
df_cat['rank'] = df_cat['rank'].astype(str)
df_cat['grid'] = df_cat['grid'].astype(int)
df_cat['points'] = df_cat['points'].astype(int)
df_cat['age'] = df_cat['age'].astype(int)

# Create the subplots
f, axes = plt.subplots(3, 1, figsize=(20, 30))

# Loop over the columns and plot the box plots
count = 0
for col in df_cat.columns:
    if col != 'rank':
        sns.boxplot(data=df_cat, x=col, y='rank', order=['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20'], ax=axes[count])
        count += 1

# Show the plot
plt.show()


In [0]:
#df_race.describe().display()

##heat map

In [0]:
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType

df_race_temp = df_race.drop('date', 'code', 'forename', 'surname', 'dob')
# convert all columns to double type
df_race_temp = df_race_temp.select([F.col(c).cast(DoubleType()) for c in df_race_temp.columns])

# convert Spark DataFrame to Pandas DataFrame
pd_df = df_race_temp.toPandas()

# plot the heatmap
plt.figure(figsize=(17,12))
sns.heatmap(pd_df.corr(),annot=True)


#Clasify race as the first half and second half by a new variable first_half

In [0]:
from pyspark.sql.functions import month, col, sum, avg, when
from pyspark.sql import Row
import matplotlib.pyplot as plt

df = df_race

# Calculate the firstHalf column with real values
df = df.withColumn("date", df["date"].cast("date"))
df = df.withColumn("firstHalf", (month(col("date")) <= 6).cast("integer"))

# Group by year, first half, and driverId and calculate the sum of points
point_year_divided = df.groupby("year", "firstHalf", "driver_id")\
    .agg((when(col("firstHalf") == 0, sum("points")).otherwise('')).alias("points"))

# Calculate the average age for each year, first half, and driverId
age_year = df.groupby("year", "firstHalf", "driver_id").agg(avg("age").alias("avg_age"), F.mean("points").alias("points"))


In [0]:
display(point_year_divided)

In [0]:
point_first_half_all = []
whole_year_point_all = []

for column in point_year_divided.columns[3:]:
    point_year_driver_divided = point_year_divided.select("year", "firstHalf", "driver_id", column).dropna(subset=[column])
    age_one_year = age_year.select("year", "firstHalf", "driver_id", "avg_age").join(point_year_driver_divided, ["year", "firstHalf", "driver_id"], "inner")

    new_df = age_one_year.withColumn("ages", when(col("avg_age").isNull(), col(column)).otherwise(col("avg_age"))) \
        .select("firstHalf", "ages", "points", (col(column) + col("points")).alias("whole_year_point"))

    point_first_half_all.extend(new_df.select("firstHalf", "ages", "points").rdd.collect())
    whole_year_point_all.extend(new_df.select("whole_year_point", "ages", "firstHalf").rdd.collect())

# Convert Python lists to DataFrames
point_first_half_all_df = spark.createDataFrame([Row(firstHalf=row[0], ages=row[1], points=row[2]) for row in point_first_half_all])
whole_year_point_all_df = spark.createDataFrame([Row(whole_year_point=row[0], ages=row[1], firstHalf=row[2]) for row in whole_year_point_all])

# Plot the scatterplot using matplotlib
import matplotlib.pyplot as plt

plt.scatter(point_first_half_all_df.select("points").rdd.flatMap(lambda x: x).collect(),
            whole_year_point_all_df.select("whole_year_point").rdd.flatMap(lambda x: x).collect())
plt.xlabel("First Half Points")
plt.ylabel("Whole Year Points")
plt.show()




In [0]:
display(point_first_half_all_df)

#Supervised method

##Linear Regression

In [0]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
new_df_1 = new_df.na.drop()
# Train and Test Split
train_df, test_df = new_df_1.randomSplit([0.8, 0.2], seed=42)

# Prepare the feature column
feature_cols = ["firstHalf"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_features = assembler.transform(train_df)
test_features = assembler.transform(test_df)

# Prepare the target column
target_col = "whole_year_point"
train_target = train_df.select(target_col)
test_target = test_df.select(target_col)

# Train the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol=target_col, maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_features)

# Make predictions on the train and test data
train_predictions = lr_model.transform(train_features)
test_predictions = lr_model.transform(test_features)

# Create the evaluator
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=target_col)

# Train dataset evaluation
train_r2 = evaluator.evaluate(train_predictions, {evaluator.metricName: "r2"})
train_mse = evaluator.evaluate(train_predictions, {evaluator.metricName: "mse"})

# Test dataset evaluation
test_r2 = evaluator.evaluate(test_predictions, {evaluator.metricName: "r2"})
test_mse = evaluator.evaluate(test_predictions, {evaluator.metricName: "mse"})

# Print evaluation results
print("Goodness of Fit of Model \tTrain Dataset")
print("Explained Variance (R^2) \t:", train_r2)
print("Mean Squared Error (MSE) \t:", train_mse)
print()

print("Goodness of Fit of Model and Prediction Accuracy \tTest Dataset")
print("Explained Variance (R^2) \t:", test_r2)
print("Mean Squared Error (MSE) \t:", test_mse)
print()

# Print intercept and coefficients
print("Intercept of Regression \t: b =", lr_model.intercept)
print("Coefficients of Regression \t: a =", lr_model.coefficients)


Goodness of Fit of Model 	Train Dataset
Explained Variance (R^2) 	: 1.8540724511240114e-14
Mean Squared Error (MSE) 	: 3722.9868754835215

Goodness of Fit of Model and Prediction Accuracy 	Test Dataset
Explained Variance (R^2) 	: -0.001432018698080073
Mean Squared Error (MSE) 	: 3397.7708448221374

Intercept of Regression 	: b = 23.08042676996884
Coefficients of Regression 	: a = [0.0]


#2

In [0]:
from pyspark.sql.functions import col

df_liniar = new_df
data = df_liniar.select("firstHalf", "whole_year_point")
data = data.withColumnRenamed("firstHalf", "features")
data = data.withColumnRenamed("whole_year_point", "label")
data = data.filter(col("features").isNotNull() & col("label").isNotNull())  # Filter out rows with null values
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)
assembler = VectorAssembler(inputCols=["features"], outputCol="features_vec")
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)
lr = LinearRegression(featuresCol="features_vec", labelCol="label")
lr_model = lr.fit(train_data)
train_predictions = lr_model.transform(train_data)
test_predictions = lr_model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

# Train dataset evaluation
train_r2 = evaluator.evaluate(train_predictions, {evaluator.metricName: "r2"})
train_mse = evaluator.evaluate(train_predictions, {evaluator.metricName: "mse"})

# Test dataset evaluation
test_r2 = evaluator.evaluate(test_predictions, {evaluator.metricName: "r2"})
test_mse = evaluator.evaluate(test_predictions, {evaluator.metricName: "mse"})
print("Goodness of Fit of Model \tTrain Dataset")
print("Explained Variance (R^2) \t:", train_r2)
print("Mean Squared Error (MSE) \t:", train_mse)
print()

print("Goodness of Fit of Model and Prediction Accuracy \tTest Dataset")
print("Explained Variance (R^2) \t:", test_r2)
print("Mean Squared Error (MSE) \t:", test_mse)

print("Intercept of Regression \t: b =", lr_model.intercept)
print("Coefficients of Regression \t: a =", lr_model.coefficients)


Goodness of Fit of Model 	Train Dataset
Explained Variance (R^2) 	: 2.4424906541753444e-15
Mean Squared Error (MSE) 	: 3693.029224305617

Goodness of Fit of Model and Prediction Accuracy 	Test Dataset
Explained Variance (R^2) 	: -0.0013070970115658387
Mean Squared Error (MSE) 	: 3534.999721331512
Intercept of Regression 	: b = 22.30073714857536
Coefficients of Regression 	: a = [0.0]


#Polynomial Regression

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler, PolynomialExpansion
from pyspark.ml.evaluation import RegressionEvaluator


# Load the data
df_polynomial = new_df.na.drop()

# Prepare the data
data = df_polynomial.select("firstHalf", "whole_year_point")
data = data.withColumnRenamed("firstHalf", "features")
data = data.withColumnRenamed("whole_year_point", "label")

# Split the data into train and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Create the feature vector assembler
assembler = VectorAssembler(inputCols=["features"], outputCol="features_vec")
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

# Perform polynomial expansion
polynomial_expansion = PolynomialExpansion(inputCol="features_vec", outputCol="poly_features", degree=2)
train_data_poly = polynomial_expansion.transform(train_data)
test_data_poly = polynomial_expansion.transform(test_data)

# Create the linear regression model
lr = LinearRegression(featuresCol="poly_features", labelCol="label")
lr_model = lr.fit(train_data_poly)

# Make predictions on train and test data
train_predictions = lr_model.transform(train_data_poly)
test_predictions = lr_model.transform(test_data_poly)

# Create the evaluation object
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

# Train dataset evaluation
train_r2 = evaluator.evaluate(train_predictions, {evaluator.metricName: "r2"})
train_mse = evaluator.evaluate(train_predictions, {evaluator.metricName: "mse"})

# Test dataset evaluation
test_r2 = evaluator.evaluate(test_predictions, {evaluator.metricName: "r2"})
test_mse = evaluator.evaluate(test_predictions, {evaluator.metricName: "mse"})

# Print the results
print("Goodness of Fit of Model \tTrain Dataset")
print("Explained Variance (R^2) \t:", train_r2)
print("Mean Squared Error (MSE) \t:", train_mse)
print()

print("Goodness of Fit of Model and Prediction Accuracy \tTest Dataset")
print("Explained Variance (R^2) \t:", test_r2)
print("Mean Squared Error (MSE) \t:", test_mse)

print("Intercept of Regression \t: b =", lr_model.intercept)
print("Coefficients of Regression \t: a =", lr_model.coefficients)


Goodness of Fit of Model 	Train Dataset
Explained Variance (R^2) 	: 2.4424906541753444e-15
Mean Squared Error (MSE) 	: 3693.029224305617

Goodness of Fit of Model and Prediction Accuracy 	Test Dataset
Explained Variance (R^2) 	: -0.0013070970115658387
Mean Squared Error (MSE) 	: 3534.999721331512
Intercept of Regression 	: b = 22.30073714857536
Coefficients of Regression 	: a = [0.0,0.0]


#Second track

In [0]:
driver_result_withdate_divided = df

In [0]:
#looop
from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

# Filter the DataFrame
driver_result_withdate_divided = driver_result_withdate_divided.filter(col("points") <= 20)


In [0]:
# Extract unique driver IDs
driver_ids = driver_result_withdate_divided.select("driver_id").distinct().rdd.flatMap(lambda x: x).collect()


In [0]:
from pyspark.sql.functions import col, avg, expr, lag
from pyspark.sql.window import Window

last_ten_race_results = []
current_race_results = []
desired_iterations = 50  # Specify the desired number of iterations

for driver_id in driver_ids:
    temp_df = driver_result_withdate_divided.filter(col("driver_id") == driver_id)
    temp_df = temp_df.sort("date")
    
    if temp_df.count() <= (desired_iterations + 10):  # Check if the number of rows is sufficient
        continue
    
    windowSpec = Window.orderBy("date")
    temp_df = temp_df.withColumn("points_lag", lag(col("points")).over(windowSpec))
    temp_df = temp_df.withColumn("age_lag", lag(col("age")).over(windowSpec))
    temp_df = temp_df.withColumn("round_lag", lag(col("round")).over(windowSpec))
    temp_df = temp_df.withColumn("grid_lag", lag(col("grid")).over(windowSpec))
    temp_df = temp_df.withColumn("raceId_lag", lag(col("race_id")).over(windowSpec))
    
    temp_df = temp_df.filter(col("points_lag").isNotNull())
    
    temp_df = temp_df.withColumn("last_ten_race_results", expr("array(points_lag, age_lag, round_lag, grid_lag, raceId_lag)"))
    temp_df = temp_df.withColumn("current_race_result", avg(col("points")).over(Window.partitionBy().orderBy("date").rowsBetween(-9, 0)))
    
    temp_df = temp_df.filter(col("current_race_result").isNotNull())
    
    last_ten_race_results.extend(temp_df.select("last_ten_race_results").collect())
    current_race_results.extend(temp_df.select("current_race_result").collect())
    
    if len(last_ten_race_results) >= desired_iterations:
        break


In [0]:
assert len(last_ten_race_results) == len(current_race_results)


In [0]:
#split the data
from pyspark.sql.functions import rand
from pyspark.ml.linalg import Vectors

# Combine the features and labels into a single DataFrame
data = zip(last_ten_race_results, current_race_results)
data_df = spark.createDataFrame(data, ["features", "label"])

# Randomly split the data into train and test sets
train_ratio = 0.8
test_ratio = 1 - train_ratio
seed = 42  # Set a seed for reproducibility

train_data, test_data = data_df.randomSplit([train_ratio, test_ratio], seed=seed)

# Extract the feature and label columns from the train and test sets
train_x = train_data.select("features").rdd.map(lambda x: x[0]).collect()
train_y = train_data.select("label").rdd.map(lambda x: x[0]).collect()

test_x = test_data.select("features").rdd.map(lambda x: x[0]).collect()
test_y = test_data.select("label").rdd.map(lambda x: x[0]).collect()


In [0]:
import numpy as np
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error

# Convert train_x and train_y to numpy arrays
train_x = np.array(train_x)
train_y = np.array(train_y)

# Reshape train_x to 2D array
train_x = train_x.reshape((train_x.shape[0], -1))

# Create and train the MLPRegressor
mlp_regressor = MLPRegressor(hidden_layer_sizes=(30, 20, 30, 10, 20, 40), max_iter=300)
mlp_regressor.fit(train_x, train_y)

# Convert test_x to numpy array
test_x = np.array(test_x)

# Reshape test_x to 2D array
test_x = test_x.reshape((test_x.shape[0], -1))

# Make predictions on the test set
test_predictions = mlp_regressor.predict(test_x)

# Evaluate the model using RMSE
rmse = mean_squared_error(test_y, test_predictions, squared=False)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse:.3f}")


Root Mean Squared Error (RMSE) on test data: 0.374


  y = column_or_1d(y, warn=True)


In [0]:
!pip install scikit-learn

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-2e899ed4-198d-4b7d-b9ed-2f42f1e48747/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error

# Create and train the MLPRegressor
mlp_regressor = MLPRegressor(hidden_layer_sizes=(30, 20, 30, 10, 20, 40), max_iter=300)
mlp_regressor.fit(train_x, train_y)

# Make predictions on the train set
train_predictions = mlp_regressor.predict(train_x)

# Check the Goodness of Fit (on Train Data)
print("Goodness of Fit of Model \tTrain Dataset")
print("R2 Score \t:", mlp_regressor.score(train_x, train_y))
print("Mean Squared Error (MSE) \t:", mean_squared_error(train_y, train_predictions))
print()

# Make predictions on the test set
test_predictions = mlp_regressor.predict(test_x)

# Check the Goodness of Fit and Prediction Accuracy (on Test Data)
print("Goodness of Fit of Model and Prediction Accuracy \tTest Dataset")
print("R2 Score \t:", mlp_regressor.score(test_x, test_y))
print("Mean Squared Error (MSE) \t:", mean_squared_error(test_y, test_predictions))


Goodness of Fit of Model 	Train Dataset
R2 Score 	: -0.005174330627510493
Mean Squared Error (MSE) 	: 0.13079861999992706

Goodness of Fit of Model and Prediction Accuracy 	Test Dataset
R2 Score 	: -0.003530651304948673
Mean Squared Error (MSE) 	: 0.13926611024575306


  y = column_or_1d(y, warn=True)
