## Reading table paths


In [0]:
file_paths = dbutils.fs.ls('mnt/bronze/dbo/')

In [0]:
def read_all_paths(file_paths):
    tablepaths = {}
    dataframes={}
    for file in file_paths:
        table_name = file.name.rstrip('/')
        print(table_name)
        path = '/mnt/bronze/dbo/' + table_name +'/' +table_name + '.parquet'
        tablepaths[table_name] = path
        dataframes[table_name] = spark.read.format('parquet').load(path)  
    return tablepaths,dataframes

## creating dictionary of dataframes using table paths

In [0]:
table_paths,dataframes_dict = read_all_paths(file_paths)

circuits
constructor_results
constructor_standings
constructors
driver_standings
drivers
lap_times
pit_stops
qualifying
races
results
seasons
sprint_results
status


In [0]:
from pyspark.sql.functions import col

result_list = []
for name, df in dataframes_dict.items():
    for column in df.columns:
        count = df.filter(col(column) == '\\N').count()
        if count > 0:
            result_list.append((name, column, count))

# Sort the result list by DataFrame name and then by column name
result_list.sort(key=lambda x: (x[0], x[1]))

In [0]:
print(result_list)

[('constructor_results', 'status', 12478), ('drivers', 'code', 757), ('drivers', 'number', 802), ('qualifying', 'q1', 154), ('qualifying', 'q2', 4567), ('qualifying', 'q3', 6703), ('races', 'fp1_date', 1035), ('races', 'fp1_time', 1057), ('races', 'fp2_date', 1035), ('races', 'fp2_time', 1057), ('races', 'fp3_date', 1053), ('races', 'fp3_time', 1072), ('races', 'quali_date', 1035), ('races', 'quali_time', 1057), ('races', 'sprint_date', 1107), ('races', 'sprint_time', 1110), ('races', 'time', 731), ('results', 'fastestLapTime', 18498), ('results', 'time', 18978), ('sprint_results', 'fastestLapTime', 9), ('sprint_results', 'time', 19)]


## Function to check if there any null values in the dataframe

In [0]:
from pyspark.sql import functions as F


def check_null_columns(table_df):
  null_cols = [col for col in table_df.columns if table_df.filter((F.col(col) == "\\N")|F.col(col).isNull()).count() > 0]
  return null_cols




## Transformations for Circuits table

In [0]:
null_dataframe_columns = check_null_columns(dataframes_dict['circuits'])

In [0]:
null_dataframe_columns

[]

In [0]:
from pyspark.sql.functions import upper, coalesce, lit,col
circuits_df = dataframes_dict['circuits']
circuits_df = circuits_df \
    .withColumn("name", upper(col("name"))) \
    .withColumn("location", upper(col("location"))) \
    .withColumn("country", upper(col("country"))) \
    .withColumn("alt", coalesce(col("alt"), lit(0)))  # Replace nulls with 0


In [0]:
circuits_df.show(77)

+---------+--------------+--------------------+--------------------+-------------+-------------------+-------------------+----+--------------------+
|circuitId|    circuitRef|                name|            location|      country|                lat|                lng| alt|                 url|
+---------+--------------+--------------------+--------------------+-------------+-------------------+-------------------+----+--------------------+
|        1|   albert_park|ALBERT PARK GRAND...|           MELBOURNE|    AUSTRALIA|-37.849700927734375| 144.96800231933594|  10|http://en.wikiped...|
|        2|        sepang|SEPANG INTERNATIO...|        KUALA LUMPUR|     MALAYSIA| 2.7608299255371094| 101.73799896240234|  18|http://en.wikiped...|
|        3|       bahrain|BAHRAIN INTERNATI...|              SAKHIR|      BAHRAIN| 26.032499313354492|  50.51060104370117|   7|http://en.wikiped...|
|        4|     catalunya|CIRCUIT DE BARCEL...|            MONTMELÓ|        SPAIN|  41.56999969482422| 2.2

## Transformations for Constructor Results Table 

In [0]:
from pyspark.sql.functions import when

constructor_results_df = dataframes_dict['constructor_results']
null_dataframe_columns = check_null_columns(constructor_results_df)



In [0]:
null_dataframe_columns

['points', 'status']

In [0]:
constructor_results_df = constructor_results_df \
    .withColumn("status", when(col("status") == "\\N", None).otherwise(col("status"))) \
    .withColumn("points",when(col("points") == "\\N", None).otherwise(col("points").cast("integer")))

In [0]:
from pyspark.sql.functions import coalesce, lit

constructor_results_df = constructor_results_df.withColumn("status", coalesce(col("status"), lit("Unknown")))
constructor_results_df = constructor_results_df.withColumn("points", coalesce(col("points"), lit(0)))


In [0]:

null_dataframe_columns = check_null_columns(constructor_results_df)
print(null_dataframe_columns)

[]


## Transformations for Constructor Standings Table 

In [0]:
null_dataframe_columns = check_null_columns(dataframes_dict['constructor_standings'])
print(null_dataframe_columns)

['positionText']


In [0]:
from pyspark.sql.functions import coalesce
constructor_standings_df = dataframes_dict['constructor_standings']


constructor_standings_df = constructor_standings_df \
    .withColumn("positionText", coalesce(col("positionText"), col("position")))


In [0]:

# constructor_results_df = dataframes_dict['constructor_results']
null_dataframe_columns = check_null_columns(constructor_standings_df)
print(null_dataframe_columns)

[]


## Transformations for Constructors Table

In [0]:
constructors_df = dataframes_dict['constructors']


In [0]:
null_dataframe_columns = check_null_columns(constructors_df)
print(null_dataframe_columns)

[]


In [0]:
constructors_df = constructors_df \
    .withColumn("name", upper(col("name"))) \
    .withColumn("nationality", upper(col("nationality")))


## Transformations for driver_standings table

In [0]:
driver_standings_df = dataframes_dict['driver_standings']


In [0]:
null_dataframe_columns = check_null_columns(driver_standings_df)
print(null_dataframe_columns)

['positionText']


In [0]:
driver_standings_df = driver_standings_df \
    .withColumn("positionText", coalesce(col("positionText"), col("position")))


## Transformations for drivers Table

In [0]:
drivers_df = dataframes_dict['drivers']

In [0]:
null_dataframe_columns = check_null_columns(drivers_df)
print(null_dataframe_columns)

['number', 'code']


In [0]:

drivers_df = drivers_df.withColumn("number", when((col("number").isNull()) | (col("number") == "\\N"), lit("Unknown")).otherwise(col("number"))) \
                       .withColumn("code", when((col("code").isNull()) | (col("code") == "\\N"), lit("Unknown")).otherwise(col("code")))


In [0]:
from pyspark.sql.functions import current_date, months_between, floor , to_date

drivers_df = drivers_df \
    .withColumn("forename", upper(col("forename"))) \
    .withColumn("surname", upper(col("surname"))) \
    .withColumn("dob", to_date(col("dob"), "yyyy-MM-dd"))


In [0]:
drivers_df.show(100)

+--------+------------------+-------+-------+---------------+-------------+----------+-----------+--------------------+
|driverId|         driverRef| number|   code|       forename|      surname|       dob|nationality|                 url|
+--------+------------------+-------+-------+---------------+-------------+----------+-----------+--------------------+
|       1|          hamilton|     44|    HAM|          LEWIS|     HAMILTON|1985-01-07|    British|http://en.wikiped...|
|       2|          heidfeld|Unknown|    HEI|           NICK|     HEIDFELD|1977-05-10|     German|http://en.wikiped...|
|       3|           rosberg|      6|    ROS|           NICO|      ROSBERG|1985-06-27|     German|http://en.wikiped...|
|       4|            alonso|     14|    ALO|       FERNANDO|       ALONSO|1981-07-29|    Spanish|http://en.wikiped...|
|       5|        kovalainen|Unknown|    KOV|         HEIKKI|   KOVALAINEN|1981-10-19|    Finnish|http://en.wikiped...|
|       6|          nakajima|Unknown|   

## Transformations for Laptimes Table

In [0]:
lap_times_df = dataframes_dict['lap_times']

In [0]:
lap_times_df.show(10)

+------+--------+---+--------+--------+------------+
|raceId|driverId|lap|position|    time|milliseconds|
+------+--------+---+--------+--------+------------+
|   841|      20|  1|       1|1:38.109|       98109|
|   841|      20|  2|       1|1:33.006|       93006|
|   841|      20|  3|       1|1:32.713|       92713|
|   841|      20|  4|       1|1:32.803|       92803|
|   841|      20|  5|       1|1:32.342|       92342|
|   841|      20|  6|       1|1:32.605|       92605|
|   841|      20|  7|       1|1:32.502|       92502|
|   841|      20|  8|       1|1:32.537|       92537|
|   841|      20|  9|       1|1:33.240|       93240|
|   841|      20| 10|       1|1:32.572|       92572|
+------+--------+---+--------+--------+------------+
only showing top 10 rows



In [0]:
null_dataframe_columns = check_null_columns(lap_times_df)
print(null_dataframe_columns)

[]


In [0]:
lap_times_df.show()

+------+--------+---+--------+--------+------------+
|raceId|driverId|lap|position|    time|milliseconds|
+------+--------+---+--------+--------+------------+
|   841|      20|  1|       1|1:38.109|       98109|
|   841|      20|  2|       1|1:33.006|       93006|
|   841|      20|  3|       1|1:32.713|       92713|
|   841|      20|  4|       1|1:32.803|       92803|
|   841|      20|  5|       1|1:32.342|       92342|
|   841|      20|  6|       1|1:32.605|       92605|
|   841|      20|  7|       1|1:32.502|       92502|
|   841|      20|  8|       1|1:32.537|       92537|
|   841|      20|  9|       1|1:33.240|       93240|
|   841|      20| 10|       1|1:32.572|       92572|
|   841|      20| 11|       1|1:32.669|       92669|
|   841|      20| 12|       1|1:32.902|       92902|
|   841|      20| 13|       1|1:33.698|       93698|
|   841|      20| 14|       3|1:52.075|      112075|
|   841|      20| 15|       4|1:38.385|       98385|
|   841|      20| 16|       2|1:31.548|       

In [0]:
from pyspark.sql.functions import unix_timestamp, to_timestamp, regexp_replace

lap_times_df = lap_times_df \
    .withColumn("time_seconds", unix_timestamp(to_timestamp(col("time"), "m:ss.SSS")))


In [0]:
lap_times_df.show(10)

+------+--------+---+--------+--------+------------+------------+
|raceId|driverId|lap|position|    time|milliseconds|time_seconds|
+------+--------+---+--------+--------+------------+------------+
|   841|      20|  1|       1|1:38.109|       98109|          98|
|   841|      20|  2|       1|1:33.006|       93006|          93|
|   841|      20|  3|       1|1:32.713|       92713|          92|
|   841|      20|  4|       1|1:32.803|       92803|          92|
|   841|      20|  5|       1|1:32.342|       92342|          92|
|   841|      20|  6|       1|1:32.605|       92605|          92|
|   841|      20|  7|       1|1:32.502|       92502|          92|
|   841|      20|  8|       1|1:32.537|       92537|          92|
|   841|      20|  9|       1|1:33.240|       93240|          93|
|   841|      20| 10|       1|1:32.572|       92572|          92|
+------+--------+---+--------+--------+------------+------------+
only showing top 10 rows



## Transformations for pit_stops Table

In [0]:
pit_stops_df = dataframes_dict['pit_stops'] 

In [0]:
null_dataframe_columns = check_null_columns(pit_stops_df)
print(null_dataframe_columns)

['duration']


In [0]:
null_col =pit_stops_df.filter((col("duration").isNull())).show()


+------+--------+----+---+-------------------+--------+------------+
|raceId|driverId|stop|lap|               time|duration|milliseconds|
+------+--------+----+---+-------------------+--------+------------+
|   853|     817|   1|  1|1970-01-01 14:08:50|    NULL|     1004718|
|   881|     814|   2| 20|1970-01-01 16:41:12|    NULL|      123124|
|   881|      16|   2| 22|1970-01-01 16:44:43|    NULL|       89401|
|   881|      18|   3| 35|1970-01-01 17:06:47|    NULL|      104833|
|   884|      16|   1|  8|1970-01-01 14:15:56|    NULL|       74026|
|   888|      17|   1|  8|1970-01-01 14:16:48|    NULL|      162042|
|   893|     154|   3| 33|1970-01-01 21:11:23|    NULL|       73009|
|   894|      16|   3| 38|1970-01-01 16:16:46|    NULL|       71445|
|   901|     817|   3| 40|1970-01-01 17:16:28|    NULL|      109329|
|   903|      13|   1| 10|1970-01-01 15:21:37|    NULL|       78277|
|   905|      20|   1|  4|1970-01-01 14:10:25|    NULL|       66065|
|   914|       3|   1|  2|1970-01-

In [0]:
from pyspark.sql.functions import col, when

# fill 'duration' where it is currently null, using the 'milliseconds' column
pit_stops_df = pit_stops_df.withColumn("duration",
                                     when(col("duration").isNull(),
                                          col("milliseconds") / 1000)
                                     .otherwise(col("duration")))


## Transformation for Qualifying Table 

##### Null Handling: Replace nulls in 'q1', 'q2', and 'q3' with "DNP" (Did Not Participate) to clarify non-participation or disqualifications for visualization purposes.
##### Binary Flags: Create flags (q1_recorded, q2_recorded, q3_recorded) to indicate recorded times, aiding in machine learning model accuracy.

In [0]:
qualifying_df = dataframes_dict['qualifying'] 

In [0]:
null_dataframe_columns = check_null_columns(qualifying_df)
print(null_dataframe_columns)

['q1', 'q2', 'q3']


In [0]:
from pyspark.sql.functions import col, when
# Assuming 'qualifying_df' is your DataFrame
qualifying_df = qualifying_df \
    .withColumn("Q1", when((col("Q1").isNull()) | (col("Q1") == "\\N") | (col("Q1") == "None"), 0).otherwise(col("Q1"))) \
    .withColumn("Q2", when((col("Q2").isNull()) | (col("Q2") == "\\N") | (col("Q2") == "None"), 0).otherwise(col("Q2"))) \
    .withColumn("Q3", when((col("Q3").isNull()) | (col("Q3") == "\\N") | (col("Q3") == "None"), 0).otherwise(col("Q3")))



In [0]:
null_df = qualifying_df.filter((col("driverId") == 13) & (col("raceId") == 59))


In [0]:
null_df.show()

+---------+------+--------+-------------+------+--------+---+---+---+
|qualifyId|raceId|driverId|constructorId|number|position| Q1| Q2| Q3|
+---------+------+--------+-------------+------+--------+---+---+---+
|      895|    59|      13|            6|     6|      21|  0|  0|  0|
+---------+------+--------+-------------+------+--------+---+---+---+



In [0]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType

def time_to_seconds(time_str):
    try:
        if time_str in ("0", "\\N", "None", None, ""):  # Handling expected non-participations and nulls
            return 0.0
        parts = time_str.split(':')
        if len(parts) == 2:
            minutes = float(parts[0])
            seconds = float(parts[1])
            total_seconds = minutes * 60 + seconds
            return total_seconds
        else:
            return 0.0  # Return 0.0 for any unexpected format
    except Exception as e:
        print(f"Failed to convert time: {time_str} with error: {str(e)}")
        return 0.0  # Returning 0.0 in case of any error during conversion

time_to_seconds_udf = udf(time_to_seconds, FloatType())

# Applying the UDF to convert times to seconds
qualifying_df = qualifying_df \
    .withColumn("Q1_seconds", time_to_seconds_udf(col("Q1"))) \
    .withColumn("Q2_seconds", time_to_seconds_udf(col("Q2"))) \
    .withColumn("Q3_seconds", time_to_seconds_udf(col("Q3")))


In [0]:
qualifying_df.show(100)

+---------+------+--------+-------------+------+--------+--------+--------+--------+----------+----------+----------+
|qualifyId|raceId|driverId|constructorId|number|position|      Q1|      Q2|      Q3|Q1_seconds|Q2_seconds|Q3_seconds|
+---------+------+--------+-------------+------+--------+--------+--------+--------+----------+----------+----------+
|        1|    18|       1|            1|    22|       1|1:26.572|1:25.187|1:26.714|    86.572|    85.187|    86.714|
|        2|    18|       9|            2|     4|       2|1:26.103|1:25.315|1:26.869|    86.103|    85.315|    86.869|
|        3|    18|       5|            1|    23|       3|1:25.664|1:25.452|1:27.079|    85.664|    85.452|    87.079|
|        4|    18|      13|            6|     2|       4|1:25.994|1:25.691|1:27.178|    85.994|    85.691|    87.178|
|        5|    18|       2|            2|     3|       5|1:25.960|1:25.518|1:27.236|     85.96|    85.518|    87.236|
|        6|    18|      15|            7|    11|       6

#### Transformations for ML modelling.

In [0]:
from pyspark.sql.functions import when

qualifying_df = qualifying_df.withColumn("Q_flag",
    when((col("Q1_seconds") != 0) & (col("Q2_seconds") != 0) & (col("Q3_seconds") != 0), "All Qualifyings")
    .when((col("Q1_seconds") != 0) & (col("Q2_seconds") != 0) & (col("Q3") == 0), "Q1 and Q2")
    .when((col("Q1_seconds") != 0) & (col("Q2_seconds") == 0), "Q1 Only")
    .otherwise("Did Not Participate"))


In [0]:
qualifying_df.show(200)

+---------+------+--------+-------------+------+--------+--------+--------+--------+----------+----------+----------+-------------------+
|qualifyId|raceId|driverId|constructorId|number|position|      Q1|      Q2|      Q3|Q1_seconds|Q2_seconds|Q3_seconds|             Q_flag|
+---------+------+--------+-------------+------+--------+--------+--------+--------+----------+----------+----------+-------------------+
|        1|    18|       1|            1|    22|       1|1:26.572|1:25.187|1:26.714|    86.572|    85.187|    86.714|    All Qualifyings|
|        2|    18|       9|            2|     4|       2|1:26.103|1:25.315|1:26.869|    86.103|    85.315|    86.869|    All Qualifyings|
|        3|    18|       5|            1|    23|       3|1:25.664|1:25.452|1:27.079|    85.664|    85.452|    87.079|    All Qualifyings|
|        4|    18|      13|            6|     2|       4|1:25.994|1:25.691|1:27.178|    85.994|    85.691|    87.178|    All Qualifyings|
|        5|    18|       2|       

In [0]:
from pyspark.sql.functions import col

# Assuming the times are already converted to seconds or a comparable unit
qualifying_df = qualifying_df \
    .withColumn("Q1_to_Q2_diff", col("Q1_seconds") - col("Q2_seconds")) \
    .withColumn("Q2_to_Q3_diff", col("Q2_seconds") - col("Q3_seconds")) \
    .withColumn("Q1_to_Q3_diff", col("Q1_seconds") - col("Q3_seconds"))


In [0]:
qualifying_df.show(10)

+---------+------+--------+-------------+------+--------+--------+--------+--------+----------+----------+----------+---------------+-------------+-------------+-------------+
|qualifyId|raceId|driverId|constructorId|number|position|      Q1|      Q2|      Q3|Q1_seconds|Q2_seconds|Q3_seconds|         Q_flag|Q1_to_Q2_diff|Q2_to_Q3_diff|Q1_to_Q3_diff|
+---------+------+--------+-------------+------+--------+--------+--------+--------+----------+----------+----------+---------------+-------------+-------------+-------------+
|        1|    18|       1|            1|    22|       1|1:26.572|1:25.187|1:26.714|    86.572|    85.187|    86.714|All Qualifyings|    1.3850021|   -1.5270004|  -0.14199829|
|        2|    18|       9|            2|     4|       2|1:26.103|1:25.315|1:26.869|    86.103|    85.315|    86.869|All Qualifyings|    0.7879944|   -1.5540009|  -0.76600647|
|        3|    18|       5|            1|    23|       3|1:25.664|1:25.452|1:27.079|    85.664|    85.452|    87.079|All

## Transformations for races Tables

In [0]:
races_df = dataframes_dict['races'] 

In [0]:
races_df = races_df \
    .withColumn("fp1_date", when(col("fp1_date") == "\\N", None).otherwise(col("fp1_date"))) \
    .withColumn("fp1_time", when(col("fp1_time") == "\\N", None).otherwise(col("fp1_time"))) \
    .withColumn("fp2_date", when(col("fp2_date") == "\\N", None).otherwise(col("fp2_date"))) \
    .withColumn("fp2_time", when(col("fp2_time") == "\\N", None).otherwise(col("fp2_time"))) \
    .withColumn("fp3_date", when(col("fp3_date") == "\\N", None).otherwise(col("fp3_date"))) \
    .withColumn("fp3_time", when(col("fp3_time") == "\\N", None).otherwise(col("fp3_time"))) \
    .withColumn("quali_date", when(col("quali_date") == "\\N", None).otherwise(col("quali_date"))) \
    .withColumn("quali_time", when(col("quali_time") == "\\N", None).otherwise(col("quali_time"))) \
    .withColumn("sprint_date", when(col("sprint_date") == "\\N", None).otherwise(col("sprint_date"))) \
    .withColumn("sprint_time", when(col("sprint_time") == "\\N", None).otherwise(col("sprint_time"))) \
    .withColumn("time", when(col("time") == "\\N", None).otherwise(col("time")))


In [0]:
races_df.show(3000)

+------+----+-----+---------+--------------------+----------+--------+--------------------+----------+--------+----------+--------+----------+--------+----------+----------+-----------+-----------+
|raceId|year|round|circuitId|                name|      date|    time|                 url|  fp1_date|fp1_time|  fp2_date|fp2_time|  fp3_date|fp3_time|quali_date|quali_time|sprint_date|sprint_time|
+------+----+-----+---------+--------------------+----------+--------+--------------------+----------+--------+----------+--------+----------+--------+----------+----------+-----------+-----------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|      NULL|    NULL|      NULL|    NULL|      NULL|    NULL|      NULL|      NULL|       NULL|       NULL|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|      NULL|    NULL|      NULL|    NULL|      NULL|    NULL|      NULL|      NULL|       NULL|       NULL|
|     3|20

In [0]:
races_df = races_df.fillna({"time":"Unknown", "fp1_date": "Unknown", "fp1_time": "Unknown", "fp2_date": "Unknown", "fp2_time": "Unknown", "fp3_date": "Unknown", "fp3_time": "Unknown", "quali_date": "Unknown", "quali_time": "Unknown", "sprint_date": "Unknown", "sprint_time": "Unknown"})


In [0]:
races_df.show(3000)

+------+----+-----+---------+--------------------+----------+--------+--------------------+----------+--------+----------+--------+----------+--------+----------+----------+-----------+-----------+
|raceId|year|round|circuitId|                name|      date|    time|                 url|  fp1_date|fp1_time|  fp2_date|fp2_time|  fp3_date|fp3_time|quali_date|quali_time|sprint_date|sprint_time|
+------+----+-----+---------+--------------------+----------+--------+--------------------+----------+--------+----------+--------+----------+--------+----------+----------+-----------+-----------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|   Unknown| Unknown|   Unknown| Unknown|   Unknown| Unknown|   Unknown|   Unknown|    Unknown|    Unknown|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|   Unknown| Unknown|   Unknown| Unknown|   Unknown| Unknown|   Unknown|   Unknown|    Unknown|    Unknown|
|     3|20

### creating race sequences with in each season

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, lit, to_timestamp, concat, rank

windowSpec = Window.partitionBy("year").orderBy("date")
races_df = races_df.withColumn("race_sequence", rank().over(windowSpec))

In [0]:
races_df.show(10)

+------+----+-----+---------+------------------+----------+-------+--------------------+--------+--------+--------+--------+--------+--------+----------+----------+-----------+-----------+-------------+
|raceId|year|round|circuitId|              name|      date|   time|                 url|fp1_date|fp1_time|fp2_date|fp2_time|fp3_date|fp3_time|quali_date|quali_time|sprint_date|sprint_time|race_sequence|
+------+----+-----+---------+------------------+----------+-------+--------------------+--------+--------+--------+--------+--------+--------+----------+----------+-----------+-----------+-------------+
|   833|1950|    1|        9|British Grand Prix|1950-05-13|Unknown|http://en.wikiped...| Unknown| Unknown| Unknown| Unknown| Unknown| Unknown|   Unknown|   Unknown|    Unknown|    Unknown|            1|
|   834|1950|    2|        6| Monaco Grand Prix|1950-05-21|Unknown|http://en.wikiped...| Unknown| Unknown| Unknown| Unknown| Unknown| Unknown|   Unknown|   Unknown|    Unknown|    Unknown|

## TRansformations for results

In [0]:
results_df = dataframes_dict['results'] 

In [0]:
from pyspark.sql.functions import col



# Assume 'df' is your DataFrame and 'column1' and 'column2' are the names of the columns you want to compare
df_with_comparison = results_df.withColumn("are_equal", col("position") == col("positionText"))

# Show results where columns are equal
df_with_comparison.filter(col("are_equal") == True).show()
df_with_comparison.filter(col("are_equal") == False).show()



+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+------------------+--------+---------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|   fastestLapSpeed|statusId|are_equal|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+------------------+--------+---------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|    10|  58|1:34:50.616|     5690616|        39|   2|      1:27.452| 218.3000030517578|       1|     true|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|     8|  58|     +5.478|     5696094|        41|   3|      1:27.739|217.58599853515625|       1|     true|
|       3|    18|       3

In [0]:
from pyspark.sql.functions import col, when

# Assuming 'df' is your DataFrame
results_df = results_df.withColumn("position", when(col("position").isNull(), col("positionOrder")).otherwise(col("position")))
results_df = results_df.withColumn("positionText", when(col("positionText").isNull(), col("positionOrder")).otherwise(col("positionText")))

# Show some results to verify
results_df.show(1000)


+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+------------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|   fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+------------------+--------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|    10|  58|1:34:50.616|     5690616|        39|   2|      1:27.452| 218.3000030517578|       1|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|     8|  58|     +5.478|     5696094|        41|   3|      1:27.739|217.58599853515625|       1|
|       3|    18|       3|            3|     7|   7|       3|           3| 

In [0]:
results_df.show(100)

+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+------------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|   fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+------------------+--------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|    10|  58|1:34:50.616|     5690616|        39|   2|      1:27.452| 218.3000030517578|       1|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|     8|  58|     +5.478|     5696094|        41|   3|      1:27.739|217.58599853515625|       1|
|       3|    18|       3|            3|     7|   7|       3|           3| 

In [0]:
from pyspark.sql.functions import col, when, lit

# Assuming 'df' is your DataFrame
results_df = results_df.withColumn("time", when(col("time") == "\\N", lit(0)).otherwise(col("time")))
results_df = results_df.withColumn("milliseconds", when(col("milliseconds").isNull(), lit(0)).otherwise(col("milliseconds")))
results_df = results_df.withColumn("fastestLap", when(col("fastestLap").isNull(), lit(0)).otherwise(col("fastestLap")))
results_df = results_df.withColumn("rank", when(col("rank").isNull(), lit(0)).otherwise(col("rank")))
results_df = results_df.withColumn("fastestLapTime", when(col("fastestLapTime")== "\\N", lit(0)).otherwise(col("fastestLapTime")))
results_df = results_df.withColumn("fastestLapSpeed", when(col("fastestLapSpeed").isNull(), lit(0)).otherwise(col("fastestLapSpeed")))






# Show some results to verify
results_df.show()


+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+------------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|   fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+------------------+--------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|    10|  58|1:34:50.616|     5690616|        39|   2|      1:27.452| 218.3000030517578|       1|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|     8|  58|     +5.478|     5696094|        41|   3|      1:27.739|217.58599853515625|       1|
|       3|    18|       3|            3|     7|   7|       3|           3| 

In [0]:
results_df.show(100)


+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+------------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|   fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+------------------+--------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|    10|  58|1:34:50.616|     5690616|        39|   2|      1:27.452| 218.3000030517578|       1|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|     8|  58|     +5.478|     5696094|        41|   3|      1:27.739|217.58599853515625|       1|
|       3|    18|       3|            3|     7|   7|       3|           3| 

## Transformations for seasons table

In [0]:
seasons_df = dataframes_dict['seasons'] 

In [0]:
null_dataframe_columns = check_null_columns(seasons_df)
print(null_dataframe_columns)

[]


## Transformations of sprint_results

In [0]:
sprint_results_df = dataframes_dict['sprint_results'] 

In [0]:
null_dataframe_columns = check_null_columns(sprint_results_df)
print(null_dataframe_columns)

['position', 'positionText', 'time', 'milliseconds', 'fastestLap', 'fastestLapTime']


In [0]:
sprint_results_df = sprint_results_df.withColumn("position", when(col("position").isNull(), col("positionOrder")).otherwise(col("position")))
sprint_results_df = sprint_results_df.withColumn("positionText", when(col("positionText").isNull(), col("positionOrder")).otherwise(col("positionText")))

In [0]:
sprint_results_df.show(100)

+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+---------+------------+----------+--------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|     time|milliseconds|fastestLap|fastestLapTime|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+---------+------------+----------+--------------+--------+
|       1|  1061|     830|            9|    33|   2|       1|           1|            1|     3|  17|25:38.426|     1538426|        14|      1:30.013|       1|
|       2|  1061|       1|          131|    44|   1|       2|           2|            2|     2|  17|   +1.430|     1539856|        17|      1:29.937|       1|
|       3|  1061|     822|          131|    77|   3|       3|           3|            3|     1|  17|   +7.502|     1545928|        17|      1:29.958|       1|
|       4|  1061|     844|            6|    16

In [0]:
from pyspark.sql.functions import col, when, lit

# Assuming 'df' is your DataFrame
sprint_results_df = sprint_results_df.withColumn("time", when(col("time") == "\\N", lit(0)).otherwise(col("time")))
sprint_results_df = sprint_results_df.withColumn("milliseconds", when(col("milliseconds").isNull(), lit(0)).otherwise(col("milliseconds")))
sprint_results_df = sprint_results_df.withColumn("fastestLap", when(col("fastestLap").isNull(), lit(0)).otherwise(col("fastestLap")))
sprint_results_df = sprint_results_df.withColumn("fastestLapTime", when(col("fastestLapTime")== "\\N", lit(0)).otherwise(col("fastestLapTime")))
# results_df = results_df.withColumn("fastestLapSpeed", when(col("fastestLapSpeed").isNull(), lit(0)).otherwise(col("fastestLapSpeed")))






# Show some results to verify
sprint_results_df.show()


+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+---------+------------+----------+--------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|     time|milliseconds|fastestLap|fastestLapTime|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+---------+------------+----------+--------------+--------+
|       1|  1061|     830|            9|    33|   2|       1|           1|            1|     3|  17|25:38.426|     1538426|        14|      1:30.013|       1|
|       2|  1061|       1|          131|    44|   1|       2|           2|            2|     2|  17|   +1.430|     1539856|        17|      1:29.937|       1|
|       3|  1061|     822|          131|    77|   3|       3|           3|            3|     1|  17|   +7.502|     1545928|        17|      1:29.958|       1|
|       4|  1061|     844|            6|    16

## Transformations for status table

In [0]:
status_df =  dataframes_dict['status'] 

In [0]:
null_dataframe_columns = check_null_columns(status_df)
print(null_dataframe_columns)

[]


In [0]:
# Example dictionary of DataFrames
dataframes_dict = {
    'circuits': circuits_df,
    'constructor_results': constructor_results_df,
    'constructor_standings': constructor_standings_df,
    'constructors': constructors_df,
    'driver_standings': driver_standings_df,
    'drivers': drivers_df,
    'laptimes': lap_times_df,
    'pitstops': pit_stops_df,
    'qualifying': qualifying_df,
    'races': races_df,
    'results': results_df,
    'seasons': seasons_df,
    'sprint_results': sprint_results_df,
    'status': status_df
}


## Writing Transformed data in the Silver Container

In [0]:

new_directory ='mnt/silver/dbo'
dbutils.fs.mkdirs(new_directory)


True

In [0]:
silver_path = dbutils.fs.ls('mnt/silver')

In [0]:
print(silver_path)

[FileInfo(path='dbfs:/mnt/silver/dbo/', name='dbo/', size=0, modificationTime=1724205956000)]


In [0]:
for table_name, df in dataframes_dict.items():
    output_path = f'/mnt/silver/dbo/{table_name}/'
    df.write.format('delta').mode('overwrite').save(output_path)
    print(f"Written {table_name} to {output_path}")


Written circuits to /mnt/silver/dbo/circuits/
Written constructor_results to /mnt/silver/dbo/constructor_results/
Written constructor_standings to /mnt/silver/dbo/constructor_standings/
Written constructors to /mnt/silver/dbo/constructors/
Written driver_standings to /mnt/silver/dbo/driver_standings/
Written drivers to /mnt/silver/dbo/drivers/
Written laptimes to /mnt/silver/dbo/laptimes/
Written pitstops to /mnt/silver/dbo/pitstops/
Written qualifying to /mnt/silver/dbo/qualifying/
Written races to /mnt/silver/dbo/races/
Written results to /mnt/silver/dbo/results/
Written seasons to /mnt/silver/dbo/seasons/
Written sprint_results to /mnt/silver/dbo/sprint_results/
Written status to /mnt/silver/dbo/status/
