# Ingest circuits.cs file

In [0]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType

circuits_schema = StructType(fields=[StructField('circuitId', IntegerType(), False),
                                     StructField('circuitRef', StringType(), True),
                                     StructField('name', StringType(), True),
                                     StructField('location', StringType(), True),
                                     StructField('country', StringType(), True),
                                     StructField('lat', DoubleType(), True),
                                     StructField('lng', DoubleType(), True),
                                     StructField('alt', IntegerType(), True),
                                     StructField('url', StringType(), True)
                                     
])


In [0]:
circuits_df = spark.read.schema(circuits_schema).csv('/mnt/udemy/Raw/circuits.csv', header=True)

In [0]:
display(circuits_df.printSchema)

<bound method DataFrame.printSchema of DataFrame[circuitId: int, circuitRef: string, name: string, location: string, country: string, lat: double, lng: double, alt: int, url: string]>

#### Selecting columns

In [0]:
#circuits_selected_df = circuits_df.select('circuitId', 'circuitRef', 'name', 'location', 'country',
                                           #'lat', 'lng', 'alt', 'url')
# or 
#circuits_selected_df = circuits_df.select(circuits_df.circuitId, circuits_df.circuitRef, 
                                          # circuits_df.name, circuits_df.location, circuits_df.country, circuits_df.lat, circuits_df.lng, circuits_df.alt, circuits_df.url)
# or
#circuits_selected = circuilts_df.select(circuits_df["circuitId"], circuits_df["circuitRef"],
                                      #      circuits_df["name"], circuits_df["location"], circuits_df["country"], circuits_df["lat"], circuits_df["lng"], circuits_df["alt"], circuits_df["url"])
# or 
from pyspark.sql.functions import col
circuits_selected_df = circuits_df.select(col("circuitId"), col("circuitRef"), col("name"), col("location"),
                                           col("country"), col("lat"), col("lng"), col("alt"), col("url"))


In [0]:
circuits_renamed_df = circuits_selected_df.withColumnRenamed("circuitId", "id") \
                                          .withColumnRenamed("circuitRef", "ref") \
                                          .withColumnRenamed("lat", "latitude") \
                                          .withColumnRenamed("lng", "longitude") \
                                          .withColumnRenamed("alt", "altitude") \
                                          .withColumnRenamed("circuitRef", "ref") \
                                          .withColumnRenamed("lat", "latitude") \
                                          .withColumnRenamed("lng", "longitude") \
                                          .withColumnRenamed("alt", "altitude") 

In [0]:
from pyspark.sql.functions import current_timestamp
circuits_final_df = circuits_renamed_df.withColumn('ingestion_date', current_timestamp())

#### Write to datalake using parquet

In [0]:
display(dbutils.fs.ls('mnt/udemy/processed'))

path,name,size,modificationTime
dbfs:/mnt/udemy/processed/circuits/,circuits/,0,1736216611000


In [0]:
circuits_final_df.write.mode('overwrite').parquet('mnt/udemy/processed/circuits')
display('mnt/udemy/processed/circuits')

'mnt/udemy/processed/circuits'

#### combine two columns


In [0]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType
from pyspark.sql.functions import current_timestamp, to_timestamp, concat, col, lit

races_schema = StructType(fields=[StructField('raceId', IntegerType(), False),
                                  StructField('year', IntegerType(), True),
                                  StructField('round', IntegerType(), True),
                                  StructField('circuitId', IntegerType(), True),
                                  StructField('name', StringType(), True),
                                  StructField('date', StringType(), True),
                                  StructField('time', StringType(), True),
                                  StructField('url', StringType(), True),                                  
                                  ])

races_df = spark.read.csv('/mnt/udemy/Raw/races.csv', header=True, schema=races_schema)

# Add ingestion date and race_timestamp to dataframe
races_df = races_df.withColumn('ingestion_date', current_timestamp()) \
                   .withColumn('race_timestamp', to_timestamp(concat(col('date'), lit(' '), col('time')), 'yyyy-MM-dd HH:mm:ss'))

# Select only the columns required
races_selected_df = races_df.select(col('raceId').alias('race_id'),
                                     col('year').alias('race_year'),
                                     col('round'),
                                     col('circuitId').alias('circuit_id'),
                                     col('name'),
                                     col('ingestion_date'),
                                     col('race_timestamp'))

In [0]:
# Write the data
races_selected_df.write.mode('overwrite').partitionBy('race_year').parquet('/mnt/udemy/processed/races')

Number of records: 1058


In [0]:
written_df = spark.read.parquet('/mnt/udemy/processed/races')
print(f"Number of records: {written_df.count()}")

Number of records: 1058
