In [0]:
dbutils.fs.mounts()

Out[1]: [MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType=''),
 MountInfo(mountPoint='/mnt/covid19projectdatalake/processed', source='abfss://processed@covid19projectdatalake.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType=''),
 MountInfo(mountPoint='/mnt/covid19projectdatalake/raw', source='abfss://raw@covid19projectdatalake.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType=''),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType='')]

In [0]:
display(dbutils.fs.ls("/mnt/covid19projectdatalake/raw"))

path,name,size,modificationTime
dbfs:/mnt/covid19projectdatalake/raw/circuits.csv,circuits.csv,10044,1683029856000
dbfs:/mnt/covid19projectdatalake/raw/constructors.json,constructors.json,30415,1683029857000
dbfs:/mnt/covid19projectdatalake/raw/drivers.json,drivers.json,180812,1683029857000
dbfs:/mnt/covid19projectdatalake/raw/lap_times/,lap_times/,0,1683029922000
dbfs:/mnt/covid19projectdatalake/raw/pit_stops.json,pit_stops.json,1369387,1683029864000
dbfs:/mnt/covid19projectdatalake/raw/qualifying/,qualifying/,0,1683029980000
dbfs:/mnt/covid19projectdatalake/raw/races.csv,races.csv,116847,1683029858000
dbfs:/mnt/covid19projectdatalake/raw/results.json,results.json,7165641,1683029883000


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

In [0]:
races_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                  StructField("year", IntegerType(), True),
                                  StructField("round", IntegerType(), True),
                                  StructField("circuitId", IntegerType(), True),
                                  StructField("name", StringType(), True),
                                  StructField("date", DateType(), True),
                                  StructField("time", StringType(), True),
                                  StructField("url", StringType(), True)
])

In [0]:
races_df = spark.read.schema(races_schema).csv("dbfs:/mnt/covid19projectdatalake/raw/races.csv", header=True)

In [0]:
from pyspark.sql.functions import current_timestamp, to_timestamp, concat, lit, col

In [0]:
races_with_timestamp_df = races_df.withColumn("ingestion_date", current_timestamp()).withColumn("race_timestamp", to_timestamp(concat(col("date"), lit(" "), col("time")), "yyyy-MM-dd HH:mm:ss"))

In [0]:
races_selected_df = races_with_timestamp_df.select(col("raceId").alias("race_id"), col("year").alias("race_yaer"), col("round"), col("circuitId").alias("circuit_id"), col("name"), col("ingestion_date"), col("race_timestamp"))

In [0]:
races_selected_df.write.mode("overwrite").parquet("/mnt/covid19projectdatalake/processed/races")