In [60]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf 

In [61]:
spark = SparkSession.builder.master("local[*]").appName("join").getOrCreate()

In [62]:
# the columns we need for this analysis
races_df =  spark.read.csv("dataset/races.csv", header=True, inferSchema=True) 
results_df = spark.read.csv("dataset/results.csv", header=True, inferSchema=True)
drivers_df = spark.read.csv("dataset/drivers.csv", header=True, inferSchema=True)
pit_stops_df = spark.read.csv("dataset/pit_stops.csv", header=True, inferSchema=True)

# display(races_df.show())
# display(results_df.show())
# display(drivers_df.show())
# display(pit_stops_df.show())

In [63]:
joined_df = (
    results_df.join(pit_stops_df, on=["raceId", "driverId"])
    .filter(results_df["position"] == "1")
    .join(drivers_df[["driverId", "forename", "surname"]], on="driverId")
    .join(races_df[["raceId", "name", "year"]], on="raceId")
    .rdd.map(
        lambda row: (
            (
                row["raceId"],
                row["year"],
                row["name"],
                row["forename"] + " " + row["surname"],
            ),
            row["stop"],
        )
    )
    .sortByKey()
    .map(lambda kv: (kv[0], 1))
    .reduceByKey(lambda x, y: x + y)
    .collect()
)

joined_df

[((841, 2011, 'Australian Grand Prix', 'Sebastian Vettel'), 2),
 ((842, 2011, 'Malaysian Grand Prix', 'Sebastian Vettel'), 3),
 ((843, 2011, 'Chinese Grand Prix', 'Lewis Hamilton'), 3),
 ((844, 2011, 'Turkish Grand Prix', 'Sebastian Vettel'), 4),
 ((845, 2011, 'Spanish Grand Prix', 'Sebastian Vettel'), 4),
 ((846, 2011, 'Monaco Grand Prix', 'Sebastian Vettel'), 1),
 ((847, 2011, 'Canadian Grand Prix', 'Jenson Button'), 6),
 ((848, 2011, 'European Grand Prix', 'Sebastian Vettel'), 3),
 ((849, 2011, 'British Grand Prix', 'Fernando Alonso'), 3),
 ((850, 2011, 'German Grand Prix', 'Lewis Hamilton'), 3),
 ((851, 2011, 'Hungarian Grand Prix', 'Jenson Button'), 3),
 ((852, 2011, 'Belgian Grand Prix', 'Sebastian Vettel'), 3),
 ((853, 2011, 'Italian Grand Prix', 'Sebastian Vettel'), 2),
 ((854, 2011, 'Singapore Grand Prix', 'Sebastian Vettel'), 3),
 ((855, 2011, 'Japanese Grand Prix', 'Jenson Button'), 3),
 ((856, 2011, 'Korean Grand Prix', 'Sebastian Vettel'), 2),
 ((857, 2011, 'Indian Grand P