##Inicia as Funções e Importações

In [0]:
from pyspark.sql import functions as F

def run_read_path(caminho: str):
    df = spark.read.options(inferSchema='True', delimiter=",", header=True).csv(caminho)
    return df

def run_cap_schema(tabela):
    if tabela == 'circuits':
        return  ['circuitId','circuitRef','name','location','country','extracted_at']
    if tabela == 'constructors':
        return ['constructorId', 'constructorRef', 'name', 'nationality', 'extracted_at']
    if tabela == 'drivers':
        return ['driverId','driverRef','number','code','forename','surname','dob','nationality', 'extracted_at']
    if tabela == 'races':
        return ['raceId','year','round','circuitId','name','date', 'extracted_at']
    if tabela == 'seasons':
        return ['year', 'extracted_at']
    else:
        return []

def run_schema_novo(tabela: str, df):
    lschema = run_cap_schema(tabela)
    if len(lschema) > 0 :
        df = df.select(*lschema)
    df = df.withColumn('created_at', F.current_date())
    return df
    
def run_save_parquet(df, caminho):
    df.coalesce(1).write.format('parquet').mode('overwrite').partitionBy('created_at').save(caminho)

def run_load_files_silver(caminho):
    ret = dbutils.fs.ls(path_raw)
    for c in ret:
        tabela = (str(c[1]).replace("/",""))
        if (tabela in table_names):
            df_rt = run_read_path(str(c[0]))
            df_rt = run_schema_novo(tabela, df_rt)
            path_tmp = path_silver+tabela
            run_save_parquet(df_rt, path_tmp)
    
        

#Roda as funções definidas criando os arquivos SILVER em Parquet

In [0]:
path_raw = "s3://f1-prj-data-lake-raw/f1/data/"
path_silver = "s3://f1-prj-data-lake-silver/f1/data/"
table_names =["circuits", "constructor_results", "constructor_standings", "constructors", "driver_standings", "drivers", "lap_times", "pit_stops",
              "qualifying", "races", "results", "seasons", "status"]


run_load_files_silver(path_raw)

#Cria Views das tabelas Silver para montagem das estatistícas

In [0]:
df = spark.read.format("parquet").load("s3://f1-prj-data-lake-silver/f1/data/constructor_results")
df.createOrReplaceTempView("constructor_results")

In [0]:
df = spark.read.format("parquet").load("s3://f1-prj-data-lake-silver/f1/data/constructors")
df.createOrReplaceTempView("constructors")

In [0]:
df = spark.read.format("parquet").load("s3://f1-prj-data-lake-silver/f1/data/races")
df.createOrReplaceTempView("races")

In [0]:
df = spark.read.format("parquet").load("s3://f1-prj-data-lake-silver/f1/data/results")
df.createOrReplaceTempView("results")

In [0]:
df = spark.read.format("parquet").load("s3://f1-prj-data-lake-silver/f1/data/drivers")
df.createOrReplaceTempView("drivers")

#Cria as Views no Bucket GOLD

In [0]:
df_view = spark.sql("select  rs.position as Posicao, rc.name as Corrida, rc.`year` as Ano, concat(dr.forename,' ' ,dr.surname) as Piloto, dr.nationality as Nacionalidade, cnt.name as Construtor from results as rs inner join constructors as cnt on (rs.constructorId = cnt.constructorId) inner join drivers as dr on (rs.driverId = dr.driverId) inner join races as rc on (rs.raceId = rc.raceId) order by rc.`year` desc")
df_view = df_view.withColumn('created_at', F.current_date())
run_save_parquet(df_view, "s3://f1-prj-data-lake-gold/f1/data/v_pilots")

In [0]:
df_view = spark.sql("select sum(cr.points) as Pontos, cnt.name as Construtor, cnt.nationality as Pais, rc.`year` as Ano from constructor_results as cr inner join constructors as cnt on (cnt.constructorId = cr.constructorId) inner join races as rc on ( rc.raceId = cr.raceId) group by cnt.name, cnt.nationality, rc.`year` order by 1 desc")
df_view = df_view.withColumn('created_at', F.current_date())
run_save_parquet(df_view, "s3://f1-prj-data-lake-gold/f1/data/v_construtores")

#Cria as tabelas para Visualização baseadas nas views no bucket gold

In [0]:
df = spark.read.format("parquet").load("s3://f1-prj-data-lake-gold/f1/data/v_construtores")
df.createOrReplaceTempView("v_construtores")
%sql CREATE TABLE default.v_construtores as select * from v_construtores

Pontos,Construtor,Pais,Ano,created_at
765.0,Mercedes,German,2016,2022-03-29
739.0,Mercedes,German,2019,2022-03-29
703.0,Mercedes,German,2015,2022-03-29
701.0,Mercedes,German,2014,2022-03-29
668.0,Mercedes,German,2017,2022-03-29
655.0,Mercedes,German,2018,2022-03-29
650.0,Red Bull,Austrian,2011,2022-03-29
613.5,Mercedes,German,2021,2022-03-29
596.0,Red Bull,Austrian,2013,2022-03-29
585.5,Red Bull,Austrian,2021,2022-03-29


In [0]:
df = spark.read.format("parquet").load("s3://f1-prj-data-lake-gold/f1/data/v_pilots")
df.createOrReplaceTempView("v_pilots")
%sql CREATE TABLE default.v_pilots as select * from v_pilots

#Checa se as tabelas foram criadas com sucesso

In [0]:
%sql show tables

database,tableName,isTemporary
default,v_construtores,False
default,v_pilots,False
,v_construtores,True
