In [0]:
%run ./utils

In [0]:
from pyspark.sql  import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,FloatType
from pyspark.sql.functions import upper, col,round,trim,md5,concat_ws,lit,current_timestamp

#creating spark session
spark = SparkSession.builder.appName('f1_dwh').getOrCreate()

#creating mount  point 
s3_mnt()

There is already mount point with f1-s3-mnt


In [0]:
#CIRCUIT SCHEMA
circuits_schema  = StructType([
    StructField("circuitId",IntegerType(),False), \
    StructField("circuitRef",StringType(),True), \
    StructField("name",StringType(),True), \
    StructField("location", StringType(), True), \
    StructField("country", StringType(), True), \
    StructField("lat", FloatType(), True), \
    StructField("lng", FloatType(), True), \
    StructField("alt", IntegerType(), True), \
    StructField("url", StringType(), True) \
                        ])
circuits_df = spark.read.option('header',True).schema(circuits_schema).csv('dbfs:/mnt/f1-s3-mnt/raw_files/circuits.csv')
#convert columns name to  upper case
#apply some data cleanisng for stringType like upper() ,trim 
for columns in circuits_df.columns:
    if circuits_df.schema[columns].dataType == StringType()and columns != 'url' :
        circuits_df = circuits_df.withColumn(columns, trim(upper(col(columns))))
    elif circuits_df.schema[columns].dataType == FloatType():
        circuits_df = circuits_df.withColumn(columns, round(col(columns),4))
    circuits_df = circuits_df.withColumnRenamed(columns, columns.upper())

#renaming the location columns location  to city
circuits_df = circuits_df.withColumnRenamed('LOCATION', 'CITY')
#creating data vault columns  in dataframe like load_date,CIRCUITID_HK,CIRCUITS_HASDIFF
circuits_df = circuits_df.withColumn('CIRCUITID_HK',upper(md5(col('CIRCUITID').cast(StringType()))))
#calculating has diff
circuits_df = circuits_df.withColumn('CIRCUITS_HASDIFF',upper(md5(concat_ws('',*circuits_df.columns[1:])).cast(StringType())))
#adding record_source & Load date
circuits_df = circuits_df.withColumn('RECORD_SOURCE',lit('CIRCUITS.CSV')).withColumn('LOAD_DATE',current_timestamp())
#verifying the changes
circuits_df.show()


+---------+--------------+--------------------+------------+---------+--------+--------+---+--------------------+--------------------+--------------------+-------------+--------------------+
|CIRCUITID|    CIRCUITREF|                NAME|        CITY|  COUNTRY|     LAT|     LNG|ALT|                 URL|        CIRCUITID_HK|    CIRCUITS_HASDIFF|RECORD_SOURCE|           LOAD_DATE|
+---------+--------------+--------------------+------------+---------+--------+--------+---+--------------------+--------------------+--------------------+-------------+--------------------+
|        1|   ALBERT_PARK|ALBERT PARK GRAND...|   MELBOURNE|AUSTRALIA|-37.8497| 144.968| 10|http://en.wikiped...|C4CA4238A0B923820...|02DE05364A81A4A9B...| CIRCUITS.CSV|2022-04-04 06:08:...|
|        2|        SEPANG|SEPANG INTERNATIO...|KUALA LUMPUR| MALAYSIA|  2.7608| 101.738| 18|http://en.wikiped...|C81E728D9D4C2F636...|1BE4DA16C02F49594...| CIRCUITS.CSV|2022-04-04 06:08:...|
|        3|       BAHRAIN|BAHRAIN INTERNATI..

LOADING DATA INTO HUB_CIRCUITS & SAT_CIRCUITS

In [0]:
from pyspark.sql.functions import broadcast
#adding HUB_circuits in RDV_TABLE/
#checking if there is already hub circuits ,if so then read data from exisitng hub & do left outer join to insert new records
try:
    hub_df = spark.read.parquet('/mnt/f1-s3-mnt/processed_file/RDV_TABLE/HUB_CIRCUITS/')  
    temp_df = hub_df.join(broadcast(circuits_df.select(col('CIRCUITID_HK'), 
                                                       col('CIRCUITID') ,
                                                       col('RECORD_SOURCE'),
                                                       col('LOAD_DATE'))),
                                                       ['CIRCUITID_HK'],'rightouter').select(circuits_df.CIRCUITID_HK,circuits_df.CIRCUITID_HK,circuits_df.RECORD_SOURCE,circuits_df.LOAD_DATE)
    #apending new df DATA to HUB_CIRCUITS
    temp_df.write.mode('append').format('parquet').save('mnt/f1-s3-mnt/processed_file/RDV_TABLE/HUB_CIRCUITS')
except Exception as e:
    #write the file for if there is no hub file
    circuits_df.select(col('CIRCUITID_HK'),col('CIRCUITID'),col('RECORD_SOURCE'),col('LOAD_DATE')).write.mode('overwrite').format('parquet').save('/mnt/f1-s3-mnt/processed_file/RDV_TABLE/HUB_CIRCUITS/')

# adding SAT_CIRCUITS in RDV_TABLE/
try:
    sat_df = spark.read.parquet('/mnt/f1-s3-mnt/processed_file/RDV_TABLE/SAT_CIRCUITS/')
    temp_df =  sat_df.join(broadcast(circuits_df.select( col('CIRCUITREF'),col('NAME'),col('CITY'),col('LAT'), col('LNG'), \
                                                        col('ALT'), col('URL'),col('CIRCUITID_HK'),col('CIRCUITS_HASDIFF'), \
                                                        col('RECORD_SOURCE'), col('LOAD_DATE'))),['CIRCUITS_HASDIFF'],'rightouter').\
                                                            select(circuits_df.RECORD_SOURCE, circuits_df.LOAD_DATE,circuits_df.CIRCUITID_HK,\
                                                                   circuits_df.CIRCUITS_HASDIFF, circuits_df.CIRCUITREF, circuits_df.NAME, circuits_df.CITY,\
                                                                   circuits_df.COUNTRY, circuits_df.LAT, circuits_df.LNG, circuits_df.ALT, circuits_df.URL)
    #adding computed file to sat_circuits table
    temp_df.write.mode('append').format('parquet').save('mnt/f1-s3-mnt/processed_file/RDV_TABLE/SAT_CIRCUITS')
except  Exception as e:
    print('May be file is missing or data being loaded freshly')
    circuits_df.select(circuits_df.RECORD_SOURCE, circuits_df.LOAD_DATE,circuits_df.CIRCUITID_HK, circuits_df.CIRCUITS_HASDIFF,\
                                                                                                      circuits_df.CIRCUITREF, circuits_df.NAME, circuits_df.CITY,\
                                                                                                      circuits_df.COUNTRY, circuits_df.LAT, circuits_df.LNG, circuits_df.ALT, circuits_df.URL).\
    write.mode('append').format('parquet').save('/mnt/f1-s3-mnt/processed_file/RDV_TABLE/SAT_CIRCUITS/')

  

    

May be file is missing or data being loaded freshly


In [0]:
%fs
ls /mnt/f1-s3-mnt/processed_file/RDV_TABLE

path,name,size
dbfs:/mnt/f1-s3-mnt/processed_file/RDV_TABLE/HUB_CIRCUITS/,HUB_CIRCUITS/,0
dbfs:/mnt/f1-s3-mnt/processed_file/RDV_TABLE/SAT_CIRCUITS/,SAT_CIRCUITS/,0
