In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
display(dbutils.fs.ls("/mnt/bluetab1002/raw"))

path,name,size,modificationTime
dbfs:/mnt/bluetab1002/raw/circuits.csv,circuits.csv,10044,1738253538000
dbfs:/mnt/bluetab1002/raw/constructors.json,constructors.json,30415,1738253538000
dbfs:/mnt/bluetab1002/raw/drivers.json,drivers.json,180812,1738253538000
dbfs:/mnt/bluetab1002/raw/lap_times/,lap_times/,0,0
dbfs:/mnt/bluetab1002/raw/pit_stops.json,pit_stops.json,1369387,1738253539000
dbfs:/mnt/bluetab1002/raw/qualifying/,qualifying/,0,0
dbfs:/mnt/bluetab1002/raw/races.csv,races.csv,116847,1738253538000
dbfs:/mnt/bluetab1002/raw/results.json,results.json,7165641,1738253542000


In [0]:
display(dbutils.fs.ls("dbfs:/mnt/bluetab1002/raw/races.csv"))

path,name,size,modificationTime
dbfs:/mnt/bluetab1002/raw/races.csv,races.csv,116847,1738253538000


In [0]:
df_schema = StructType([
    StructField('raceId', IntegerType(), True),
    StructField('year', IntegerType(), False),
    StructField('round', IntegerType(), False),
    StructField('circuitId', IntegerType(), True),
    StructField('name', StringType(), False),
    StructField('date', DateType(), False),
    StructField('time', StringType(), False),
    StructField('url', StringType(), False),
])

In [0]:
df = spark.read\
    .option('header', True) \
    .schema(df_schema) \
    .csv(f'{raw_folder_path}/races.csv')

In [0]:
df = df.withColumnRenamed('raceId', 'race_id') \
.withColumnRenamed('circuitId', 'circuit_id') \
.withColumnRenamed('year', 'race_year') \
.drop('url') \
.withColumn(
      'race_timestamp',
      to_timestamp(concat(col("date"), lit(" "), col("time")), "yyyy-MM-dd HH:mm:ss")
) \
.drop('date') \
.drop('time') \
.withColumn('data_source', lit(v_data_source))
   #.withColumn("datetime", to_timestamp(concat_ws(" ", col("date"), col("time")), "dd-MM-yyyy HH:mm"))
df = add_ingestion_date(df)
display(df)
df.printSchema()

race_id,race_year,round,circuit_id,name,race_timestamp,ingestion_date
1,2009,1,1,Australian Grand Prix,2009-03-29T06:00:00Z,2025-02-01T17:43:14.85Z
2,2009,2,2,Malaysian Grand Prix,2009-04-05T09:00:00Z,2025-02-01T17:43:14.85Z
3,2009,3,17,Chinese Grand Prix,2009-04-19T07:00:00Z,2025-02-01T17:43:14.85Z
4,2009,4,3,Bahrain Grand Prix,2009-04-26T12:00:00Z,2025-02-01T17:43:14.85Z
5,2009,5,4,Spanish Grand Prix,2009-05-10T12:00:00Z,2025-02-01T17:43:14.85Z
6,2009,6,6,Monaco Grand Prix,2009-05-24T12:00:00Z,2025-02-01T17:43:14.85Z
7,2009,7,5,Turkish Grand Prix,2009-06-07T12:00:00Z,2025-02-01T17:43:14.85Z
8,2009,8,9,British Grand Prix,2009-06-21T12:00:00Z,2025-02-01T17:43:14.85Z
9,2009,9,20,German Grand Prix,2009-07-12T12:00:00Z,2025-02-01T17:43:14.85Z
10,2009,10,11,Hungarian Grand Prix,2009-07-26T12:00:00Z,2025-02-01T17:43:14.85Z


root
 |-- race_id: integer (nullable = true)
 |-- race_year: integer (nullable = true)
 |-- round: integer (nullable = true)
 |-- circuit_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- race_timestamp: timestamp (nullable = true)
 |-- ingestion_date: timestamp (nullable = false)



In [0]:
df.write.mode("overwrite").partitionBy('race_year').parquet(f"{process1_folder_path}/races")

In [0]:
display(dbutils.fs.ls("mnt/bluetab1002/process1/races"))

path,name,size,modificationTime
dbfs:/mnt/bluetab1002/process1/races/_SUCCESS,_SUCCESS,0,1738431803000
dbfs:/mnt/bluetab1002/process1/races/_committed_3225151779131770414,_committed_3225151779131770414,35,1738431801000
dbfs:/mnt/bluetab1002/process1/races/race_year=1950/,race_year=1950/,0,1738324538000
dbfs:/mnt/bluetab1002/process1/races/race_year=1951/,race_year=1951/,0,1738324538000
dbfs:/mnt/bluetab1002/process1/races/race_year=1952/,race_year=1952/,0,1738324538000
dbfs:/mnt/bluetab1002/process1/races/race_year=1953/,race_year=1953/,0,1738324538000
dbfs:/mnt/bluetab1002/process1/races/race_year=1954/,race_year=1954/,0,1738324538000
dbfs:/mnt/bluetab1002/process1/races/race_year=1955/,race_year=1955/,0,1738324538000
dbfs:/mnt/bluetab1002/process1/races/race_year=1956/,race_year=1956/,0,1738324538000
dbfs:/mnt/bluetab1002/process1/races/race_year=1957/,race_year=1957/,0,1738324538000


In [0]:
dbutils.notebook.exit("Sucess")