##### Ingesting circuits.csv

In [0]:
%run
"../includes/configurations"

In [0]:
%run
"../includes/common_functions"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [0]:
circuits_schema = StructType(fields=[StructField('circuitId', IntegerType(), False),
                                      StructField('circuitRef', StringType(), True),
                                      StructField('name', StringType(), True),
                                      StructField('location', StringType(), True),
                                      StructField('country', StringType(), True),
                                      StructField('lat', DoubleType(), True),
                                      StructField('lng', DoubleType(), True),
                                      StructField('alt', IntegerType(),True),
                                      StructField('url', StringType(), True)
])

In [0]:
file = f'{RAW_FOLDER_PATH}/circuits.csv'

# inferSchema=True, this is not recommended for production as it will take more time cause it will read through the whole file, which is inefficient for large files, we should use the schema above
# circuits_df = spark.read.csv(file, header=True, inferSchema=True)

circuits_df = spark.read.csv(file, header=True, schema=circuits_schema)

##### Selecting and renaming relevant columns

In [0]:
# circuits_selecetd_df = circuits_df.select('circuitId', 'circuitRef', 'name', 'location', 'country', 'lat', 'lng', 'alt')

In [0]:
from pyspark.sql.functions import col

In [0]:
circuits_selected_df = circuits_df.select(col('circuitId'), col('circuitRef'), col('name'), col('location'), col('country'), col('lat'), col('lng'), col('alt'))

In [0]:
circuits_renamed_df = circuits_selected_df.withColumnRenamed('circuitId', 'circuit_id') \
                                          .withColumnRenamed('circuitRef', 'circuit_ref') \
                                          .withColumnRenamed('lat', 'latitude') \
                                          .withColumnRenamed('lng', 'longitude') \
                                          .withColumnRenamed('alt', 'altitude')

##### Adding new column for ingestion date

In [0]:
circuits_final_df = add_ingestion_date(circuits_renamed_df)

##### writing to data lake as parquet

In [0]:
circuits_final_df.write.parquet(f'{PROCESSED_FOLDER_PATH}/circuits',mode='overwrite')

In [0]:
display(spark.read.parquet(f'{PROCESSED_FOLDER_PATH}/circuits'))