## Início - Import de funções e SparkSession

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, to_timestamp, month, year, to_date, sum, avg, floor, round
from pyspark.sql.window import Window

spark = SparkSession.builder \
            .master("local[*]") \
            .appName("airflow_prod") \
            .config('spark.executor.memory', '6g') \
            .config('spark.driver.memory', '6g') \
            .config("spark.driver.maxResultSize", "1048MB") \
            .config("spark.port.maxRetries", "100") \
            .getOrCreate()

diretorio = "/home/jovyan/work/notebooks/datalake"

## Funções para a camada Trusted

In [2]:
def realiza_pivot(df, tipo):
    # separa as colunas datas para realizar pivot
    colunas = df.columns[4:]

    # coleta numero de colunas
    n = len(colunas)
    
    # expressao para realizar o pivot
    expr = "stack({}, {}) as (data, {})".format(n, ", ".join(["'{}', `{}`".format(col, col) for col in colunas]),tipo)
    
    # realiza pivot
    df = df.selectExpr("estado", "pais", "latitude", "longitude", expr)
    return df

In [3]:
def renomea_colunas(df):
    df = df.withColumnRenamed("Province/State", "estado") \
        .withColumnRenamed("Country/Region", "pais") \
        .withColumnRenamed("Lat", "latitude") \
        .withColumnRenamed("Long", "longitude")
    return df

In [4]:
def trunca_colunas(df):
    # flor retorna o valor inteiro da multiplicação longitudade * 100 e após divido por 100 para ficar com 2 casas decimais
    df = df.withColumn("longitude", floor(df["longitude"] * 100) / 100).withColumn("latitude", floor(df["latitude"] * 100) / 100)
    return df

## Início da camada Trusted

In [5]:
# recupera dados e renomea colunas
df_recovered = spark.read.options(header='true',inferSchema=True).csv(diretorio + '/raw/covid19/time_series_covid19_recovered_global.csv')
df_recovered = renomea_colunas(df_recovered)
df_recovered = trunca_colunas(df_recovered) # algumas colunas lat e long estavam com incosistencia nos decimais

df_confirmed = spark.read.options(header='true',inferSchema=True).csv(diretorio + '/raw/covid19/time_series_covid19_confirmed_global.csv')
df_confirmed = renomea_colunas(df_confirmed)
df_confirmed = trunca_colunas(df_confirmed)

df_deaths = spark.read.options(header='true',inferSchema=True).csv(diretorio + '/raw/covid19/time_series_covid19_deaths_global.csv')
df_deaths = renomea_colunas(df_deaths)
df_deaths = trunca_colunas(df_deaths)

In [6]:
# realiza pivot de todas as tabelas
recovered = realiza_pivot(df_recovered, "quantidade_recuperados")

confirmed = realiza_pivot(df_confirmed, "quantidade_confirmados")

deaths = realiza_pivot(df_deaths, "quantidade_mortes")

In [7]:
# inner join entre as tabelas deaths e confirmed
join = confirmed.join( deaths,((confirmed.latitude == deaths.latitude) & (confirmed.longitude == deaths.longitude)  & (confirmed.data == deaths.data) & (confirmed.pais == deaths.pais)), "inner" ) \
    .select(deaths["*"], confirmed["quantidade_confirmados"])


In [8]:
# right join entre as tabelas join e recovered
join = recovered.join(join,((recovered.latitude == join.latitude) & (recovered.longitude == join.longitude)  & (recovered.data == join.data) & (recovered.pais == join.pais)), "right" ) \
    .select(join["*"], recovered["quantidade_recuperados"])

In [9]:
# altera os tipos de dados
trusted = join.withColumn("data", to_timestamp(col("data"), "M/d/yy")) \
    .withColumn("quantidade_mortes", col("quantidade_mortes").cast("long")) \
    .withColumn("quantidade_confirmados", col("quantidade_confirmados").cast("long")) \
    .withColumn("quantidade_recuperados", col("quantidade_recuperados").cast("long")) \
    .withColumn("mes", month("data")) \
    .withColumn("ano", year("data"))

In [10]:
trusted

DataFrame[estado: string, pais: string, latitude: double, longitude: double, data: timestamp, quantidade_mortes: bigint, quantidade_confirmados: bigint, quantidade_recuperados: bigint, mes: int, ano: int]

In [11]:
trusted.filter((trusted["pais"] == "Armenia")).orderBy(col("data").desc()).show(10)

+------+-------+--------+---------+-------------------+-----------------+----------------------+----------------------+---+----+
|estado|   pais|latitude|longitude|               data|quantidade_mortes|quantidade_confirmados|quantidade_recuperados|mes| ano|
+------+-------+--------+---------+-------------------+-----------------+----------------------+----------------------+---+----+
|  NULL|Armenia|   40.06|    45.03|2021-05-12 00:00:00|             4272|                219950|                206078|  5|2021|
|  NULL|Armenia|   40.06|    45.03|2021-05-11 00:00:00|             4256|                219596|                205675|  5|2021|
|  NULL|Armenia|   40.06|    45.03|2021-05-10 00:00:00|             4249|                219353|                205200|  5|2021|
|  NULL|Armenia|   40.06|    45.03|2021-05-09 00:00:00|             4234|                219270|                204578|  5|2021|
|  NULL|Armenia|   40.06|    45.03|2021-05-08 00:00:00|             4225|                219092| 

In [12]:
# grava os dados em um unico arquivo particionado por ano e mes no formato parquet
trusted.repartition(1).write.format("parquet").option("header", "true").mode("overwrite").partitionBy("ano","mes").save(diretorio + "/Trusted")

## Início da camada Refinada

In [13]:
# leitura dos dados da camada trusted
df_trusted = spark.read.options(header='true',inferSchema=True).parquet(diretorio + "/Trusted")

In [14]:
# seleção dos dados para a camada refined
df_trusted = df_trusted.select("pais","data", "quantidade_confirmados", "quantidade_mortes", "quantidade_recuperados", "ano")

In [15]:
# agregação dos dados por pais e data
agg = df_trusted.groupBy("pais","data","ano").agg(sum("quantidade_confirmados"), sum("quantidade_mortes"), sum("quantidade_recuperados")).orderBy("data")

In [16]:
# cria janela particionada por pais e ordenada por data com intervalo de 7 valores
window = Window.partitionBy("pais").orderBy("data").rowsBetween(-6, 0)

# cria coluna media movel e faz o arredondamente para duas casas decimais
refined_media_movel = agg.withColumn("media_movel_mortes", round(avg(col("sum(quantidade_mortes)")).over(window), 2)) \
    .withColumn("media_movel_confirmados", round(avg(col("sum(quantidade_confirmados)")).over(window), 2)) \
    .withColumn("media_movel_recuperados", round(avg(col("sum(quantidade_recuperados)")).over(window), 2))

In [17]:
# selecão de colunas e cast das colunas para long
refined = refined_media_movel .select("pais","data","media_movel_confirmados", "media_movel_mortes","media_movel_recuperados", "ano") \
    .withColumn("media_movel_mortes", col("media_movel_mortes").cast("long")) \
    .withColumn("media_movel_confirmados", col("media_movel_confirmados").cast("long")) \
    .withColumn("media_movel_recuperados", col("media_movel_recuperados").cast("long"))

In [18]:
refined

DataFrame[pais: string, data: timestamp, media_movel_confirmados: bigint, media_movel_mortes: bigint, media_movel_recuperados: bigint, ano: int]

In [19]:
refined.filter((refined["pais"] == "Armenia")).orderBy(col("data").desc()).show(10)

+-------+-------------------+-----------------------+------------------+-----------------------+----+
|   pais|               data|media_movel_confirmados|media_movel_mortes|media_movel_recuperados| ano|
+-------+-------------------+-----------------------+------------------+-----------------------+----+
|Armenia|2021-05-12 00:00:00|                 219181|              4233|                 204461|2021|
|Armenia|2021-05-11 00:00:00|                 218888|              4220|                 203816|2021|
|Armenia|2021-05-10 00:00:00|                 218575|              4207|                 203163|2021|
|Armenia|2021-05-09 00:00:00|                 218240|              4193|                 202488|2021|
|Armenia|2021-05-08 00:00:00|                 217896|              4179|                 201799|2021|
|Armenia|2021-05-07 00:00:00|                 217540|              4165|                 201084|2021|
|Armenia|2021-05-06 00:00:00|                 217166|              4151|          

In [20]:
# grava os dados em um unico arquivo particionando por ano no formato parquet na camada refinada
refined.repartition(1).write.format("parquet").option("header", "true").mode("overwrite").partitionBy("ano").save(diretorio + "/Refined")

In [None]:
spark.stop()