In [0]:
import pandas as pd

base = "/Workspace/Users/p70madhu@gmail.com/motorsport_project/data/"

# CSV files don't have headers, so we need to specify column names manually
lap_columns = ['raceId', 'driverId', 'lap', 'position', 'time', 'milliseconds']

lap1 = spark.createDataFrame(pd.read_csv(base+"lap_times_split_1.csv", header=None, names=lap_columns))
lap2 = spark.createDataFrame(pd.read_csv(base+"lap_times_split_2.csv", header=None, names=lap_columns))
lap3 = spark.createDataFrame(pd.read_csv(base+"lap_times_split_3.csv", header=None, names=lap_columns))
lap4 = spark.createDataFrame(pd.read_csv(base+"lap_times_split_4.csv", header=None, names=lap_columns))
lap5 = spark.createDataFrame(pd.read_csv(base+"lap_times_split_5.csv", header=None, names=lap_columns))

lap_all = lap1.union(lap2).union(lap3).union(lap4).union(lap5)

lap_all.write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable("bronze_lap_times")

In [0]:
# Ingest circuits and races CSV files into bronze tables
spark.createDataFrame(pd.read_csv(base+"circuits.csv", header=0)).write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable("bronze_circuits")
spark.createDataFrame(pd.read_csv(base+"races.csv", header=0)).write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable("bronze_races")

print("✓ bronze_circuits table created successfully")
print("✓ bronze_races table created successfully")

✓ bronze_circuits table created successfully
✓ bronze_races table created successfully


In [0]:
# Read JSON files and replace '\N' with None for proper null handling
drivers_df = pd.read_json(base+"drivers.json", lines=True).replace('\\N', None)
constructors_df = pd.read_json(base+"constructors.json", lines=True).replace('\\N', None)
results_df = pd.read_json(base+"results.json", lines=True).replace('\\N', None)

# Convert mixed-type columns to string to avoid Arrow conversion errors
results_df = results_df.astype(str)
drivers_df = drivers_df.astype(str)
constructors_df = constructors_df.astype(str)

spark.createDataFrame(drivers_df).write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable("bronze_drivers")
spark.createDataFrame(constructors_df).write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable("bronze_constructors")
spark.createDataFrame(results_df).write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable("bronze_results")