In [19]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructField, StructType, StringType, LongType, DoubleType, IntegerType, DateType

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Trackwide_data")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")


# Load the circuits data

# define schema
circuit_schema = StructType([
    StructField("circuitID", StringType(), True),
    StructField("circuitRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("alt", LongType(), True),
    StructField("url", StringType(), True),
])

gsc_file_path_circuits = 'gs://data-group1-ass2/circuits.csv'

# Create data frame
circuits = spark.read.format("csv").schema(circuit_schema).option("header", "true").load(gsc_file_path_circuits)
circuits.show(3)

# Load the drivers data

# define schema
drivers_schema = StructType([
    StructField("driverID", StringType(), True),
    StructField("driverRef", StringType(), True),
    StructField("number", LongType(), True),
    StructField("code", StringType(), True),
    StructField("forename", StringType(), True), 
    StructField("surname", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("nationality", StringType(), True),
    StructField("url", StringType(), True),
])

gsc_file_path_drivers = 'gs://data-group1-ass2/drivers.csv'

# Create data frame
drivers = spark.read.format("csv").schema(drivers_schema).option("header", "true").load(gsc_file_path_drivers)
drivers.show(3)

# Load the lap_times data

# define schema
lap_times_schema = StructType([
    StructField("raceID", StringType(), True),
    StructField("driverID", StringType(), True),
    StructField("lap", LongType(), True),
    StructField("position", LongType(), True),
    StructField("time", StringType(), True), #StructField("time", DayTimeIntervalType("MINUTE", "SECOND"), True),
    StructField("miliseconds", LongType(), True),
])

gsc_file_path_lap_times = 'gs://data-group1-ass2/lap_times.csv'

# Create data frame
lap_times = spark.read.format("csv").schema(lap_times_schema).option("header", "true").load(gsc_file_path_lap_times)
lap_times.show(3)

# Load the pit_stops data

# define schema
pit_stops_schema = StructType([
    StructField("raceID", StringType(), True),
    StructField("driverID", StringType(), True),
    StructField("stop", IntegerType(), True),
    StructField("lap", IntegerType(), True),
    StructField("time", StringType(), True), #StructField("time", DayTimeIntervalType("MINUTE", "SECOND"), True),
    StructField("duration", DoubleType(), True),
    StructField("miliseconds", LongType(), True),
])

gsc_file_path_pit_stops = 'gs://data-group1-ass2/pit_stops.csv'

# Create data frame
pit_stops = spark.read.format("csv").schema(pit_stops_schema).option("header", "true").load(gsc_file_path_pit_stops)
pit_stops.show(3)

# Load the races data

# define schema
races_schema = StructType([
    StructField("raceID", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("round", IntegerType(), True),
    StructField("circuitID", StringType(), True),
    StructField("name", StringType(), True),
    StructField("date", DateType(), True),
    StructField("time", StringType(), True), #StructField("time", DayTimeIntervalType("MINUTE", "SECOND"), True),
    StructField("url", StringType(), True),
    StructField("fp1_date", DateType(), True),
    StructField("fp1_time", StringType(), True), #StructField("time", DayTimeIntervalType("MINUTE", "SECOND"), True),
])

gsc_file_path_races = 'gs://data-group1-ass2/races.csv'

# Create data frame
races = spark.read.format("csv").schema(races_schema).option("header", "true").load(gsc_file_path_races)
races.show(3)

+---------+-----------+--------------------+------------+---------+--------+-------+---+--------------------+
|circuitID| circuitRef|                name|    location|  country|     lat|    lng|alt|                 url|
+---------+-----------+--------------------+------------+---------+--------+-------+---+--------------------+
|        1|albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|144.968| 10|http://en.wikiped...|
|        2|     sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|101.738| 18|http://en.wikiped...|
|        3|    bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|50.5106|  7|http://en.wikiped...|
+---------+-----------+--------------------+------------+---------+--------+-------+---+--------------------+
only showing top 3 rows

+--------+---------+------+----+--------+--------+----------+-----------+--------------------+
|driverID|driverRef|number|code|forename| surname|       dob|nationality|                 url|
+--------+-----

In [20]:
from pyspark.sql.functions import struct

circuits = circuits.select("circuitID", "name")
circuits.na.drop("any", subset=["circuitID"])
drivers = drivers.select("driverID", struct("forename", "surname").alias("full_name"))
drivers.na.drop("any", subset=["driverID"])
lap_times = lap_times.select("raceID", "driverID", "time", "miliseconds")
lap_times.na.drop("any", subset=["raceID", "driverID"])
pit_stops = pit_stops.select("raceID", "driverID", "duration")
pit_stops.na.drop("any", subset=["raceID", "driverID"])
races = races.select("raceID", "circuitID", "date")
races.na.drop("any", subset=["raceID", "circuitID"])

DataFrame[raceID: string, circuitID: string, date: date]

Find the fastest lap per race

In [21]:
from pyspark.sql.functions import min, dense_rank,col
from pyspark.sql import Row, Window

window_find_fasest = Window.partitionBy("raceID").orderBy(col("miliseconds").asc())

lap_times_with_rank = lap_times.withColumn("rank_asc_per_race", dense_rank().over(window_find_fasest))
lap_times_top3_per_race = lap_times_with_rank.where((col('rank_asc_per_race') == 1) | 
                                                    (col('rank_asc_per_race') == 2) | 
                                                    (col('rank_asc_per_race') == 3))

find the fastes lap per circuit

In [22]:
races_with_top_3_laps = races.join(lap_times_top3_per_race, ['raceID'])

window_find_fasest_circuit = Window.partitionBy("circuitID").orderBy(col("miliseconds").asc())

races_with_lap_rank = races_with_top_3_laps.withColumn("rank_asc_per_circuit", dense_rank().over(window_find_fasest_circuit))
lap_times_top3_per_circuit = races_with_lap_rank.where((col('rank_asc_per_circuit') == 1) | 
                                                    (col('rank_asc_per_circuit') == 2) | 
                                                    (col('rank_asc_per_circuit') == 3))

lap_times_top3_per_circuit.show(50)

+------+---------+----------+--------+--------+-----------+-----------------+--------------------+
|raceID|circuitID|      date|driverID|    time|miliseconds|rank_asc_per_race|rank_asc_per_circuit|
+------+---------+----------+--------+--------+-----------+-----------------+--------------------+
|  1076|        1|2022-04-10|     844|1:20.260|      80260|                1|                   1|
|  1076|        1|2022-04-10|       4|1:20.846|      80846|                2|                   2|
|  1076|        1|2022-04-10|     844|1:20.966|      80966|                3|                   3|
|   101|       10|2004-07-25|       8|1:13.780|      73780|                1|                   1|
|   101|       10|2004-07-25|      30|1:13.783|      73783|                2|                   2|
|   101|       10|2004-07-25|      30|1:13.864|      73864|                3|                   3|
|   136|       11|2002-08-18|      30|1:16.207|      76207|                1|                   1|
|  1033|  

add driver name and circuit name

In [27]:
from pyspark.sql.functions import expr

fastest_laps_combined_data = lap_times_top3_per_circuit.join(drivers, ['driverID']).join(circuits, ['circuitID'])

first = fastest_laps_combined_data.where('rank_asc_per_circuit == 1').selectExpr("name as circuit_name", 
                                                                                 "full_name as fastest_driver",
                                                                                 "time as fastest_lap_time",
                                                                                 "date as date_fastest")
second = fastest_laps_combined_data.where('rank_asc_per_circuit == 2').selectExpr("name as circuit_name", 
                                                                                 "full_name as second_fastest_driver",
                                                                                 "time as second_fastest_lap_time",
                                                                                 "date as date_second_fastest")
third = fastest_laps_combined_data.where('rank_asc_per_circuit == 3').selectExpr("name as circuit_name", 
                                                                                 "full_name as third_fastest_driver",
                                                                                 "time as third_fastest_lap_time",
                                                                                 "date as date_third_fastest")

fastest_lap_final_data = first.join(second, ['circuit_name']).join(third, ['circuit_name'])

fastest_lap_final_data.sort('circuit_name').show(150)

+--------------------+--------------------+----------------+------------+---------------------+-----------------------+-------------------+--------------------+----------------------+------------------+
|        circuit_name|      fastest_driver|fastest_lap_time|date_fastest|second_fastest_driver|second_fastest_lap_time|date_second_fastest|third_fastest_driver|third_fastest_lap_time|date_third_fastest|
+--------------------+--------------------+----------------+------------+---------------------+-----------------------+-------------------+--------------------+----------------------+------------------+
|Albert Park Grand...|  {Charles, Leclerc}|        1:20.260|  2022-04-10|   {Fernando, Alonso}|               1:20.846|         2022-04-10|  {Charles, Leclerc}|              1:20.966|        2022-04-10|
|Autodromo Enzo e ...|   {Lewis, Hamilton}|        1:15.484|  2020-11-01|   {Valtteri, Bottas}|               1:15.902|         2020-11-01|   {Lewis, Hamilton}|              1:15.914|     

In [30]:
# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "temp_group1_ass2"
spark.conf.set('temporaryGcsBucket', bucket)

# Saving the data to BigQuery
fastest_lap_final_data.sort('circuit_name').write.format('bigquery') \
  .option('table', 'deassignment2.Output_processing_pipeline.fastest_laps') \
  .mode("append") \
  .save()

Py4JJavaError: An error occurred while calling o679.save.
: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Request had insufficient authentication scopes.
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:299)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$18.call(BigQueryImpl.java:778)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$18.call(BigQueryImpl.java:775)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.run(RetryHelper.java:76)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.getTable(BigQueryImpl.java:774)
	at com.google.cloud.bigquery.connector.common.BigQueryClient.getTable(BigQueryClient.java:121)
	at com.google.cloud.spark.bigquery.write.BigQueryInsertableRelationBase.<init>(BigQueryInsertableRelationBase.java:47)
	at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.<init>(BigQueryDeprecatedIndirectInsertableRelation.java:33)
	at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createBigQueryInsertableRelationInternal(CreatableRelationProviderHelper.java:119)
	at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createBigQueryInsertableRelation(CreatableRelationProviderHelper.java:95)
	at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:47)
	at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:106)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
GET https://www.googleapis.com/bigquery/v2/projects/deassignment2/datasets/Output_processing_pipeline/tables/fastest_laps?prettyPrint=false
{
  "code" : 403,
  "details" : [ {
    "@type" : "type.googleapis.com/google.rpc.ErrorInfo",
    "reason" : "ACCESS_TOKEN_SCOPE_INSUFFICIENT"
  } ],
  "errors" : [ {
    "domain" : "global",
    "message" : "Insufficient Permission",
    "reason" : "insufficientPermissions"
  } ],
  "message" : "Request had insufficient authentication scopes.",
  "status" : "PERMISSION_DENIED"
}
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:439)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:525)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:576)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:297)
	... 54 more


In [5]:
# Stop the spark context
spark.stop()