# Importing the necessary libraries

In [1]:
import yaml
import os
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType
import json

In [2]:
spark = SparkSession.builder.appName("YAML to CSV").getOrCreate()


25/01/06 13:05:02 WARN Utils: Your hostname, Obuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/01/06 13:05:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/06 13:05:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, BooleanType

schema = StructType([
    StructField("year", IntegerType(), True),
    StructField("entrantId", StringType(), True),
    StructField("constructorId", StringType(), True),
    StructField("driverId", StringType(), True),
    StructField("tookPart", BooleanType(), True),
    StructField("testDriver", BooleanType(), True)
])
#year,entrant,constructor,driver,tookPart,TestDriver

readDataset = spark.read.schema(schema).option("header", True).csv("/home/floppabox/f1/f1-data-project-gr/csv_datasets/driversAllYears")


In [4]:
#SQL query for debut years of each driver

readDataset.createOrReplaceGlobalTempView("driversAllYears")

spark.sql("""SELECT MIN(year) as debut_year, driverId
          FROM global_temp.driversAllYears
          WHERE tookPart == True
          GROUP BY driverId""").createOrReplaceGlobalTempView("debut")

In [5]:
#SQL query for reirement years (to adjust, since all the 2024 entries are considered as retired)

spark.sql("""SELECT *
          FROM (
            SELECT MAX(year) as retirement_year, driverId
            FROM global_temp.driversAllYears
            WHERE tookPart == True
            GROUP BY driverId
          ) AS temp_ret
          WHERE retirement_year < (SELECT MAX(year) as maximum FROM global_temp.driversAllYears)""").show(300)

                                                                                

+---------------+--------------------+
|retirement_year|            driverId|
+---------------+--------------------+
|           2015|       roberto-merhi|
|           2012|    pedro-de-la-rosa|
|           2012|    jerome-dambrosio|
|           2003|       nicolas-kiesa|
|           2013|         charles-pic|
|           2009|giancarlo-fisichella|
|           2009|    nelson-piquet-jr|
|           2006|            yuji-ide|
|           2021|      nikita-mazepin|
|           2007|   markus-winkelhock|
|           2000|      johnny-herbert|
|           2014|     kamui-kobayashi|
|           2024|        pierre-gasly|
|           2020|        daniil-kvyat|
|           2021|      kimi-raikkonen|
|           2013|         mark-webber|
|           2024|    daniel-ricciardo|
|           2024|      logan-sargeant|
|           2015|        will-stevens|
|           2003|        ralph-firman|
|           2018|   stoffel-vandoorne|
|           2024|        lance-stroll|
|           2006|  juan-p

In [6]:
#sql queries for transfers

transferDB = spark.sql("""
    SELECT TransferOut.driverId, const_out, transfer_out, const_in, transfer_in
    FROM (
        SELECT MAX(year) AS transfer_out, driverId, constructorId AS const_out
        FROM global_temp.driversAllYears
        WHERE tookPart = True
        GROUP BY driverId, constructorId
    ) AS TransferOut
    INNER JOIN (
        SELECT MIN(year) AS transfer_in, driverId, constructorId AS const_in
        FROM global_temp.driversAllYears
        WHERE tookPart = True
        GROUP BY driverId, constructorId
    ) AS TransIn
    ON TransferOut.driverId = TransIn.driverId
   AND TransferOut.const_out != TransIn.const_in  -- Ensures the constructor is different (i.e., a transfer)
   AND (
       TransferOut.transfer_out = TransIn.transfer_in - 1  -- A normal consecutive transfer (next year)
       OR (TransferOut.transfer_out = TransIn.transfer_in AND TransferOut.const_out != TransIn.const_in)
   )
""")

transferDB.createOrReplaceGlobalTempView("transfer")


#    ON TransferOut.driverId = TransIn.driverId 
#       AND TransferOut.transfer_out = TransIn.transfer_in - 1 AND TransferOut.const_out != TransIn.const_in

In [7]:
#sql queries for breaks (needing some updates)

spark.sql("""
    SELECT Break.driverId, const_out, break_year, const_in, return_year, return_year - break_year as gap
    FROM (
        SELECT MAX(year) AS break_year, driverId, constructorId AS const_out
        FROM global_temp.driversAllYears
        WHERE tookPart = True
        GROUP BY driverId, constructorId
    ) AS Break
    INNER JOIN (
        SELECT MIN(year) AS return_year, driverId, constructorId AS const_in
        FROM global_temp.driversAllYears
        WHERE tookPart = True
        GROUP BY driverId, constructorId
    ) AS Return
    ON Break.driverId = Return.driverId 
       AND Break.break_year < Return.return_year - 1
    GROUP BY Break.driverId, const_out, break_year, const_in, return_year
""").createOrReplaceGlobalTempView("gapBreak")


In [8]:
spark.catalog.listTables("global_temp")

[Table(name='debut', catalog=None, namespace=['global_temp'], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='driversAllYears', catalog=None, namespace=['global_temp'], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='gapBreak', catalog=None, namespace=['global_temp'], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='transfer', catalog=None, namespace=['global_temp'], description=None, tableType='TEMPORARY', isTemporary=True)]

In [9]:
spark.sql("SELECT * FROM global_temp.transfer WHERE driverId == 'nico-hulkenberg'").show(3000)

                                                                                

+---------------+------------+------------+------------+-----------+
|       driverId|   const_out|transfer_out|    const_in|transfer_in|
+---------------+------------+------------+------------+-----------+
|nico-hulkenberg|aston-martin|        2022|        haas|       2023|
|nico-hulkenberg| force-india|        2016|     renault|       2017|
|nico-hulkenberg|     renault|        2019|racing-point|       2020|
+---------------+------------+------------+------------+-----------+



In [10]:
spark.sql("""
    SELECT gb.*
    FROM global_temp.gapBreak gb
    LEFT ANTI JOIN global_temp.transfer t
    ON gb.driverId = t.driverId AND gb.const_in = t.const_in AND gb.return_year == t.transfer_in
""").createOrReplaceGlobalTempView("filteredGap")

In [11]:
spark.sql("""
    SELECT fg.*
    FROM global_temp.filteredGap fg 
        INNER JOIN (SELECT driverId, const_in, return_year, MIN(gap) AS act
                    FROM global_temp.filteredGap fg
                    GROUP BY driverId, const_in, return_year) AS ab
        ON ab.driverId == fg.driverId AND ab.const_in == fg.const_in AND ab.return_year == fg.return_year AND fg.gap == ab.act
""").createOrReplaceGlobalTempView("filteredGap")

breakDB = spark.sql("""
    SELECT fg.*
    FROM global_temp.filteredGap fg 
        INNER JOIN (SELECT driverId, const_out, break_year, MIN(gap) AS act
                    FROM global_temp.filteredGap fg
                    GROUP BY driverId, const_out, break_year) AS ab
        ON ab.driverId == fg.driverId AND ab.const_out == fg.const_out AND ab.break_year == fg.break_year AND fg.gap == ab.act
""")

In [12]:
temp = spark.sql("""SELECT db.*, dr.constructorId
            FROM global_temp.driversAllYears dr
                INNER JOIN global_temp.debut db
                ON db.debut_year == dr.year AND db.driverId == dr.driverId""")

In [13]:
from neo4j import GraphDatabase

uri = "bolt://localhost:7687"
username = "neo4j"
password = "password"


dummy = GraphDatabase.driver(uri, auth=(username, password))

debut = list(temp.toLocalIterator())
transfer = list(transferDB.toLocalIterator())
breakData = list(breakDB.toLocalIterator())
retirement = list(retirementDB.toLocalIterator())
batch_size = 500 

with dummy.session() as session:
    for i in range(0, len(debut), batch_size):
        batch = debut[i:i + batch_size] 
        query = """
        UNWIND $batch AS row
        MERGE (d:Driver {name: row.driverId})
        MERGE (t:Team {name: row.constructorId})
        MERGE (d)-[:DEBUT {year: row.debut_year}]->(t)
        """
        parameters = {
            "batch": [
                {
                    "driverId": row.driverId,
                    "constructorId": row.constructorId,
                    "debut_year": row.debut_year
                }
                for row in batch
            ]
        }
        session.run(query, parameters)
    for i in range(0, len(transfer), batch_size):
        batch = transfer[i:i + batch_size] #driverId|   const_out|transfer_out|    const_in|transfer_in
        query = """
        UNWIND $batch AS row
        MERGE (d:Driver {name: row.driverId})
        MERGE (to:Team {name: row.const_out})
        MERGE (ti: Team {name: row.const_in})
        MERGE (to)-[:TRANSFER_OUT {year: row.transfer_out}]->(d)
        MERGE (d)-[:TRANSFER_IN {year: row.transfer_in}]->(ti)
        """
        parameters = {
            "batch": [
                {
                    "driverId": row.driverId,
                    "const_out": row.const_out,
                    "transfer_out": row.transfer_out,
                    "const_in": row.const_in,
                    "transfer_in": row.transfer_in
                }
                for row in batch
            ]
        }
        session.run(query, parameters)    # driverId|           const_out|break_year|     const_in|return_year|gap
    for i in range(0, len(breakData), batch_size):
        batch = breakData[i:i + batch_size] #driverId|   const_out|transfer_out|    const_in|transfer_in
        query = """
        UNWIND $batch AS row
        MERGE (d:Driver {name: row.driverId})
        MERGE (to:Team {name: row.const_out})
        MERGE (ti: Team {name: row.const_in})
        MERGE (to)-[:BREAK{year: row.break_year}]->(d)
        MERGE (d)-[:RETURN {year: row.return_year}]->(ti)
        """
        parameters = {
            "batch": [
                {
                    "driverId": row.driverId,
                    "const_out": row.const_out,
                    "break_year": row.break_year,
                    "const_in": row.const_in,
                    "return_year": row.return_year
                }
                for row in batch
            ]
        }
        session.run(query, parameters)

dummy.close()

                                                                                

In [22]:
#fixed retirement query

retirementDB = spark.sql("""SELECT *
                FROM (
                    SELECT years.driverId, constructors.constructorId, years.retirement_year
                    FROM (SELECT MAX(year) AS retirement_year, driverId
                        FROM global_temp.driversAllYears
                        WHERE tookPart = True AND year < (SELECT MAX(year) as maximum FROM global_temp.driversAllYears)
                        GROUP BY driverId) years
                         INNER JOIN
                         (SELECT year, driverId, constructorId
                        FROM global_temp.driversAllYears
                        WHERE tookPart = True AND year < (SELECT MAX(year) as maximum FROM global_temp.driversAllYears)
                        ) constructors ON years.retirement_year == constructors.year AND years.driverId == constructors.driverId 
                ) AS temp_ret
                WHERE retirement_year < (
                    SELECT MAX(year) AS maximum 
                    FROM global_temp.driversAllYears 
                    WHERE year < (SELECT MAX(year) as maximum FROM global_temp.driversAllYears)
                )
                OR driverId NOT IN (
                    SELECT DISTINCT driverId 
                    FROM global_temp.driversAllYears 
                    WHERE year = (SELECT MAX(year) as maximum FROM global_temp.driversAllYears) AND tookPart = False
                )
          """)

retirementDB.show(1500)

                                                                                

+--------------------+--------------------+---------------+
|            driverId|       constructorId|retirement_year|
+--------------------+--------------------+---------------+
|       roberto-merhi|            marussia|           2015|
|    pedro-de-la-rosa|                 hrt|           2012|
|    jerome-dambrosio|            lotus-f1|           2012|
|       nicolas-kiesa|             minardi|           2003|
|         charles-pic|            caterham|           2013|
|giancarlo-fisichella|             ferrari|           2009|
|giancarlo-fisichella|         force-india|           2009|
|    nelson-piquet-jr|             renault|           2009|
|            yuji-ide|         super-aguri|           2006|
|      nikita-mazepin|                haas|           2021|
|   markus-winkelhock|              spyker|           2007|
|      johnny-herbert|              jaguar|           2000|
|     kamui-kobayashi|            caterham|           2014|
|        daniil-kvyat|          alphatau