In [1]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("F1 New")
  .getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://10.110.168.85:4041
SparkContext available as 'sc' (version = 3.3.2, master = local[*], app id = local-1681853168221)
SparkSession available as 'spark'


23/04/18 17:26:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@611527a3


In [2]:
val sc = spark.sparkContext

sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@28b59462


In [3]:
val drivers = spark.read.format("csv")
  .option("header", "true")
  .load("../data/drivers.csv")

val races = spark.read.format("csv")
  .option("header", "true")
  .load("../data/races.csv")

val quali = spark.read.format("csv")
  .option("header", "true")
  .load("../data/qualifying.csv")

val results = spark.read.format("csv")
  .option("header", "true")
  .load("../data/results.csv")

val constructors = spark.read.format("csv")
  .option("header", "true")
  .load("../data/constructors.csv")

drivers: org.apache.spark.sql.DataFrame = [driverId: string, driverRef: string ... 7 more fields]
races: org.apache.spark.sql.DataFrame = [raceId: string, year: string ... 16 more fields]
quali: org.apache.spark.sql.DataFrame = [qualifyId: string, raceId: string ... 7 more fields]
results: org.apache.spark.sql.DataFrame = [resultId: string, raceId: string ... 16 more fields]
constructors: org.apache.spark.sql.DataFrame = [constructorId: string, constructorRef: string ... 3 more fields]


In [17]:
// Filter Races based on "name"
val hungarianRace = races.filter(col("name") === "Hungarian Grand Prix")
val britishRace = races.filter(col("name") === "British Grand Prix")

// Join Quali and Drivers dataframes
val qualiDrivers = quali.join(drivers, Seq("driverId"))

// Join Results, Races, Constructors and Drivers dataframes
val resultsRaces = results.join(races, Seq("raceId"))
  .join(constructors, Seq("constructorId"))
  .join(drivers, Seq("driverId"))

// Merge data for Hungarian Grand Prix
val hungarianData = resultsRaces.join(hungarianRace, Seq("raceId"))
  .join(qualiDrivers, Seq("raceId", "driverId"), "left")
  .na.drop()

// Merge data for British Grand Prix
val britishData = resultsRaces.join(britishRace, Seq("raceId"))
  .join(qualiDrivers, Seq("raceId", "driverId"), "left")
  .na.drop()




hungarianRace: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [raceId: string, year: string ... 16 more fields]
britishRace: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [raceId: string, year: string ... 16 more fields]
qualiDrivers: org.apache.spark.sql.DataFrame = [driverId: string, qualifyId: string ... 15 more fields]
resultsRaces: org.apache.spark.sql.DataFrame = [driverId: string, constructorId: string ... 45 more fields]
hungarianData: org.apache.spark.sql.DataFrame = [raceId: string, driverId: string ... 77 more fields]
britishData: org.apache.spark.sql.DataFrame = [raceId: string, driverId: string ... 77 more fields]


In [39]:
// Merge data for Hungarian Grand Prix with aliases
val hungarianData = resultsRaces.alias("r")
  .join(hungarianRace.alias("hr"), Seq("raceId"))
  .join(qualiDrivers.alias("qd"), Seq("raceId", "driverId"), "left")
  .na.drop()
  .selectExpr("r.*", "hr.name as race_name", "hr.date as race_date", "hr.time as race_time", "qd.position as quali_position")

// Merge data for British Grand Prix with aliases
val britishData = resultsRaces.alias("r")
  .join(britishRace.alias("br"), Seq("raceId"))
  .join(qualiDrivers.alias("qd"), Seq("raceId", "driverId"), "left")
  .na.drop()
  .selectExpr("r.*", "br.name as race_name", "br.date as race_date", "br.time as race_time", "qd.position as quali_position")


hungarianData: org.apache.spark.sql.DataFrame = [raceId: string, driverId: string ... 49 more fields]
britishData: org.apache.spark.sql.DataFrame = [raceId: string, driverId: string ... 49 more fields]


In [41]:
import org.apache.spark.ml.feature.StringIndexer

// define list of columns to index
val indexedCols = Seq("driverId")

// create a StringIndexer for each column and fit/transform the data
val hungarianIndexed = indexedCols.foldLeft(hungarianData) { (df, col) =>
  val indexer = new StringIndexer().setInputCol(col).setOutputCol(col + "_indexed")
  indexer.fit(df).transform(df)
}

val britishIndexed = indexedCols.foldLeft(britishData) { (df, col) =>
  val indexer = new StringIndexer().setInputCol(col).setOutputCol(col + "_indexed")
  indexer.fit(df).transform(df)
}


import org.apache.spark.ml.feature.StringIndexer
indexedCols: Seq[String] = List(driverId)
hungarianIndexed: org.apache.spark.sql.DataFrame = [raceId: string, driverId: string ... 50 more fields]
britishIndexed: org.apache.spark.sql.DataFrame = [raceId: string, driverId: string ... 50 more fields]


In [42]:
hungarianIndexed.show(5)

+------+--------+-------------+--------+------+----+--------+------------+-------------+------+----+-------+------------+----------+----+--------------+---------------+--------+----+-----+---------+--------------------+----------+--------+--------------------+--------+--------+--------+--------+--------+--------+----------+----------+-----------+-----------+--------------+--------+-----------+--------------------+-----------+------+----+---------------+-------------+----------+-----------+--------------------+--------------------+----------+---------+--------------+----------------+
|raceId|driverId|constructorId|resultId|number|grid|position|positionText|positionOrder|points|laps|   time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|year|round|circuitId|                name|      date|    time|                 url|fp1_date|fp1_time|fp2_date|fp2_time|fp3_date|fp3_time|quali_date|quali_time|sprint_date|sprint_time|constructorRef|    name|nationality|              

In [43]:
val hungarianIndexedFinal = hungarianIndexed
                            .withColumn("grid", col("grid").cast("integer"))
                            .withColumn("fastestLapSpeed", col("fastestLapSpeed").cast("integer"))
                            .withColumn("position", col("position").cast("integer"))
val britishIndexedFinal = britishIndexed.withColumn("grid", col("grid").cast("integer"))
                                        .withColumn("fastestLapSpeed", col("fastestLapSpeed").cast("integer"))
                                        .withColumn("position", col("position").cast("integer"))

hungarianIndexedFinal: org.apache.spark.sql.DataFrame = [raceId: string, driverId: string ... 50 more fields]
britishIndexedFinal: org.apache.spark.sql.DataFrame = [raceId: string, driverId: string ... 50 more fields]


In [36]:
hungarianIndexedFinal.printSchema

root
 |-- raceId: string (nullable = true)
 |-- driverId: string (nullable = true)
 |-- constructorId: string (nullable = true)
 |-- resultId: string (nullable = true)
 |-- number: string (nullable = true)
 |-- grid: integer (nullable = true)
 |-- position: string (nullable = true)
 |-- positionText: string (nullable = true)
 |-- positionOrder: string (nullable = true)
 |-- points: string (nullable = true)
 |-- laps: string (nullable = true)
 |-- time: string (nullable = true)
 |-- milliseconds: string (nullable = true)
 |-- fastestLap: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- fastestLapTime: string (nullable = true)
 |-- fastestLapSpeed: integer (nullable = true)
 |-- statusId: string (nullable = true)
 |-- year: string (nullable = true)
 |-- round: string (nullable = true)
 |-- circuitId: string (nullable = true)
 |-- name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- url: string (nullable = true)
 |-- 

In [32]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

// Split the data into training and test sets
val Array(hungarianTrain, hungarianTest) = hungarianIndexedFinal.randomSplit(Array(0.7, 0.3), seed=42)
val Array(britishTrain, britishTest) = britishIndexedFinal.randomSplit(Array(0.7, 0.3), seed=42)

// Define the input features for the model
val featureCols = Array("grid", "fastestLapSpeed")

// Create a VectorAssembler to combine the input features into a single vector
val assembler = new VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features")

// Apply the VectorAssembler to the training and test sets
val hungarianTrainFeatures = assembler.transform(hungarianTrain)
val hungarianTestFeatures = assembler.transform(hungarianTest)
val britishTrainFeatures = assembler.transform(britishTrain)
val britishTestFeatures = assembler.transform(britishTest)

// Create a LogisticRegression estimator
val lr = new LogisticRegression()

// Fit the estimator on the training set
val hungarianModel = lr.fit(hungarianTrainFeatures)
val britishModel = lr.fit(britishTrainFeatures)

// Use the trained model to make predictions on the test set
val hungarianPredictions = hungarianModel.transform(hungarianTestFeatures)
val britishPredictions = britishModel.transform(britishTestFeatures)


java.lang.IllegalArgumentException:  label does not exist. Available: raceId, driverId, constructorId, resultId, number, grid, position, positionText, positionOrder, points, laps, time, milliseconds, fastestLap, rank, fastestLapTime, fastestLapSpeed, statusId, year, round, circuitId, name, date, time, url, fp1_date, fp1_time, fp2_date, fp2_time, fp3_date, fp3_time, quali_date, quali_time, sprint_date, sprint_time, constructorRef, name, nationality, url, driverRef, number, code, forename, surname, dob, nationality, url, year, round, circuitId, name, date, time, url, fp1_date, fp1_time, fp2_date, fp2_time, fp3_date, fp3_time, quali_date, quali_time, sprint_date, sprint_time, qualifyId, constructorId, number, position, q1, q2, q3, driverRef, number, code, forename, surname, dob, nationality, url, driverId_indexed, features

In [54]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

// Remove rows with null values
val hungarianIndexedFinalNoNull = hungarianIndexedFinal.na.drop()

// Define the input features for the model
val featureCols = Array("grid", "fastestLapSpeed")

// Create a VectorAssembler to combine the input features into a single vector
val assembler = new VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features")
  .setHandleInvalid("skip") // Handle null values by skipping them

// Apply the VectorAssembler to the data
val dataWithFeatures = assembler.transform(hungarianIndexedFinalNoNull)

// Split the data into training and test sets
val Array(trainData, testData) = dataWithFeatures.randomSplit(Array(0.7, 0.3), seed=42)

// Define the LogisticRegression estimator
val lr = new LogisticRegression()
  .setLabelCol("position")
  .setFeaturesCol("features")

// Fit the estimator on the training set
val model = lr.fit(trainData)

// Use the trained model to make predictions on the test set
val predictions = model.transform(testData)



import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
hungarianIndexedFinalNoNull: org.apache.spark.sql.DataFrame = [raceId: string, driverId: string ... 50 more fields]
featureCols: Array[String] = Array(grid, fastestLapSpeed)
assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_756138155a0b, handleInvalid=skip, numInputCols=2
dataWithFeatures: org.apache.spark.sql.DataFrame = [raceId: string, driverId: string ... 51 more fields]
trainData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [raceId: string, driverId: string ... 51 more fields]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [raceId: string, driverId: string ....


In [55]:
predictions.show(5)

+------+--------+-------------+--------+------+----+--------+------------+-------------+------+----+---------+------------+----------+----+--------------+---------------+--------+----+-----+---------+--------------------+----------+--------+--------------------+--------+--------+--------+--------+--------+--------+----------+----------+-----------+-----------+--------------+-----------+-----------+--------------------+-----------+------+----+---------+-----------+----------+-----------+--------------------+--------------------+----------+---------+--------------+----------------+------------+--------------------+--------------------+----------+
|raceId|driverId|constructorId|resultId|number|grid|position|positionText|positionOrder|points|laps|     time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|year|round|circuitId|                name|      date|    time|                 url|fp1_date|fp1_time|fp2_date|fp2_time|fp3_date|fp3_time|quali_date|quali_time|sprint_da

In [51]:
import org.apache.spark.sql.functions.col

// Select necessary columns from testData and predictions
val predictionsWithName = predictions.select("surname", "prediction")



import org.apache.spark.sql.functions.col
predictionsWithName: org.apache.spark.sql.DataFrame = [surname: string, prediction: double]


In [78]:
val diverPrediction = predictions
  .groupBy("surname","psoition")
  .agg(round(avg("prediction"), 0).as("prediction_driver"))

org.apache.spark.sql.AnalysisException:  Cannot resolve column name "psoition" among (raceId, driverId, constructorId, resultId, number, grid, position, positionText, positionOrder, points, laps, time, milliseconds, fastestLap, rank, fastestLapTime, fastestLapSpeed, statusId, year, round, circuitId, name, date, time, url, fp1_date, fp1_time, fp2_date, fp2_time, fp3_date, fp3_time, quali_date, quali_time, sprint_date, sprint_time, constructorRef, name, nationality, url, driverRef, number, code, forename, surname, dob, nationality, url, race_name, race_date, race_time, quali_position, driverId_indexed, features, rawPrediction, probability, prediction)

In [75]:
diverPrediction.show()

+-----------+-----------------+
|    surname|prediction_driver|
+-----------+-----------------+
|   Nakajima|             11.0|
|Barrichello|             11.0|
|   Monteiro|             18.0|
|  Magnussen|             12.0|
|  Coulthard|              9.0|
|    Rosberg|              5.0|
|  Ricciardo|             11.0|
|     Bottas|              2.0|
|     Stroll|             14.0|
|       Ocon|              7.0|
|      Buemi|             13.0|
|      Klien|             11.0|
|     Alonso|              5.0|
|     Vergne|             11.0|
|      Gasly|              6.0|
|   Grosjean|              6.0|
|     Webber|              6.0|
|     Latifi|             10.0|
|     Trulli|             10.0|
|    Russell|             11.0|
+-----------+-----------------+
only showing top 20 rows



In [77]:
diverPrediction.write
  .format("csv")
  .option("header", "true")
  .mode("overwrite")
  .save("output2.csv")

In [72]:
val teamPrediction = predictions
  .groupBy("constructorRef")
  .agg(round(avg("prediction"), 0).as("prediction_team"))

teamPrediction: org.apache.spark.sql.DataFrame = [constructorRef: string, prediction_team: double]


In [73]:
teamPrediction.show()

+--------------+---------------+
|constructorRef|prediction_team|
+--------------+---------------+
|        sauber|           10.0|
|        jaguar|           11.0|
|          alfa|           14.0|
|  lotus_racing|           18.0|
|      red_bull|            4.0|
|    bmw_sauber|           13.0|
|       mclaren|            7.0|
|        jordan|           15.0|
|      mercedes|            8.0|
|        toyota|            5.0|
|      marussia|           18.0|
|      williams|           10.0|
|           bar|            4.0|
|  racing_point|           17.0|
|   force_india|           11.0|
|    toro_rosso|           10.0|
|       ferrari|            6.0|
|        virgin|           19.0|
|        alpine|            4.0|
|      lotus_f1|            2.0|
+--------------+---------------+
only showing top 20 rows



In [76]:
teamPrediction.write
  .format("csv")
  .option("header", "true")
  .mode("overwrite")
  .save("output.csv")
