#### Definições

In [0]:
# spark
from pyspark.sql.functions import year, month, quarter, weekofyear, dayofweek, date_format, col, concat, coalesce, lit

##### Carregando dados Trusted

In [0]:
base_trusted = spark.read.option("header", "true").option("sep", "|").csv("/mnt/projetointegrador/trusted/base_trusted/")
base_trusted = base_trusted.alias('base_trusted')

In [0]:
base_trusted.createOrReplaceTempView('base_trusted')

In [0]:
%sql select * from base_trusted where DATE_ID is null

FONTE,STATE,COUNTRY,LAT,LONG,CONFIRMED_AC,DEATHS_AC,RECOVERED_AC,ACTIVE_AC,INCIDENT_RATE,CASE_FATALITY_RATIO,CONFIRMED,DEATHS,RECOVERED,ACTIVE,DATE_ID,LOCATION_KEY
GENERAL,,Antigua and Barbuda,17.0608,-61.7964,7,0,0.0,7.0,,0.0,7,0,0.0,7.0,,Antigua and Barbuda
GENERAL,,Antigua and Barbuda,17.0608,-61.7964,7,0,0.0,7.0,,0.0,0,0,0.0,0.0,,Antigua and Barbuda
GENERAL,,Antigua and Barbuda,17.0608,-61.7964,7,0,0.0,7.0,,0.0,0,0,0.0,0.0,,Antigua and Barbuda
GENERAL,,Antigua and Barbuda,17.0608,-61.7964,1,0,0.0,1.0,,0.0,-6,0,0.0,-6.0,,Antigua and Barbuda
GENERAL,,Antigua and Barbuda,17.0608,-61.7964,15,0,0.0,15.0,,0.0,14,0,0.0,14.0,,Antigua and Barbuda
GENERAL,,Antigua and Barbuda,17.0608,-61.7964,15,0,0.0,15.0,,0.0,0,0,0.0,0.0,,Antigua and Barbuda
GENERAL,,Antigua and Barbuda,17.0608,-61.7964,9,0,0.0,9.0,,0.0,-6,0,0.0,-6.0,,Antigua and Barbuda
GENERAL,,Austria,47.5162,14.5501,8717,120,636.0,8874.0,,1.3766203969255475,8717,120,636.0,8874.0,,Austria
GENERAL,,Austria,47.5162,14.5501,8100,101,479.0,8223.0,,1.2469135802469136,-617,-19,-157.0,-651.0,,Austria
GENERAL,,Austria,47.5162,14.5501,7293,81,225.0,7978.0,,1.1106540518305226,-807,-20,-254.0,-245.0,,Austria


In [0]:
base_trusted = (
    base_trusted
        .withColumn('LOCATION_KEY', concat(coalesce(base_trusted.STATE, lit("")), base_trusted.COUNTRY))
)

##### dim_location

In [0]:
dim_location = (
    base_trusted
        .select("LOCATION_KEY", "COUNTRY", "STATE", "LAT", "LONG")
        .dropDuplicates(["LOCATION_KEY"])
)

dim_location = dim_location.select("LOCATION_KEY", "COUNTRY", "STATE", "LAT", "LONG")
dim_location = dim_location.alias('dim_location')

##### dim_calendar

In [0]:
dim_calendar =(
    base_trusted
        .select("DATE_ID")
        .distinct()
        .withColumn("Ano", year("DATE_ID"))
        .withColumn("Nome do Mês", date_format("DATE_ID", "MMMM"))
        .withColumn("Mês", month("DATE_ID"))
        .withColumn("Trimestre", quarter("DATE_ID"))
        .withColumn("Semana do Ano", weekofyear("DATE_ID"))
        .withColumn("Nome do Dia", date_format("DATE_ID", "EEEE"))
        .withColumn("Dia da Semana", dayofweek("DATE_ID"))
)

##### fato_covid

In [0]:
fato_covid = base_trusted.select("LOCATION_KEY", "DATE_ID", "FONTE", "CONFIRMED", "DEATHS", "RECOVERED", "ACTIVE","CONFIRMED_AC", "DEATHS_AC", "RECOVERED_AC", "ACTIVE_AC", "INCIDENT_RATE", "CASE_FATALITY_RATIO")

In [0]:
fato_covid = fato_covid.withColumn("DATE_KEY_PART", col("DATE_ID"))

##### Exportando Refined

In [0]:
cam_final = "/mnt/projetointegrador/refined/fato_covid"

# fato
fato_covid.write.format('csv').mode('overwrite').partitionBy('DATE_KEY_PART').options(header='True', delimiter='|').save(cam_final)

In [0]:
cam_final = "/mnt/projetointegrador/refined/dim_location"

# dim_location
dim_location.write.format('csv').mode('overwrite').options(header='True', delimiter='|').save(cam_final)

In [0]:
cam_final = "/mnt/projetointegrador/refined/dim_calendar"

#dim_calendar
dim_calendar.write.format('csv').mode('overwrite').options(header='True', delimiter='|').save(cam_final)

In [0]:
cam_final = "/mnt/projetointegrador/refined/base_trusted"

#base_trusted
base_trusted.write.format('csv').mode('overwrite').options(header='True', delimiter='|').save(cam_final)