# Unificação e tratamento de dados

##Imports

In [0]:
from pyspark.sql.functions import col, when, desc, lower, concat_ws, concat, to_date, lit, date_format, split, explode, size, lpad

## Queries para buscar dados de todos os anos

In [0]:
%sql
USE CATALOG controlled_medication_data;
USE SCHEMA bronze_layer;

In [0]:
df_2014 = spark.sql("SELECT * FROM sngpc_2014")
df_2015 = spark.sql("SELECT * FROM sngpc_2015")
df_2016 = spark.sql("SELECT * FROM sngpc_2016")
df_2017 = spark.sql("SELECT * FROM sngpc_2017")
df_2018 = spark.sql("SELECT * FROM sngpc_2018")
df_2019 = spark.sql("SELECT * FROM sngpc_2019")
df_2020 = spark.sql("SELECT * FROM sngpc_2020")
df_2021 = spark.sql("SELECT * FROM sngpc_2021")


##Unindo todos os dataframes em um só

In [0]:
df_union = df_2014.union(df_2015).union(df_2016).union(df_2017).union(df_2018).union(df_2019).union(df_2020).union(df_2021)

In [0]:
df_union.display()


In [0]:
df_union.count()

## Analisando dados do dataframe

In [0]:
df_union.select("ANO_VENDA").distinct().display()

In [0]:
df_union.select("MES_VENDA").distinct().display()

In [0]:
df_union.select("UF_VENDA").distinct().display()

In [0]:
df_union.select("MUNICIPIO_VENDA").distinct().display()

In [0]:
df_union.select("PRINCIPIO_ATIVO").distinct().display()

In [0]:
df_union.select("DESCRICAO_APRESENTACAO").distinct().display()

In [0]:
df_union.select("QTD_VENDIDA").distinct().display()

In [0]:
df_union.select("UNIDADE_MEDIDA").distinct().display()

In [0]:
df_union.select("CONSELHO_PRESCRITOR").distinct().display()

In [0]:
df_union.select("UF_CONSELHO_PRESCRITOR").distinct().display()

In [0]:
df_union.select("TIPO_RECEITUARIO").distinct().display()

In [0]:
df_union.select("CID10").distinct().display()

In [0]:
df_union.select("SEXO").distinct().display()

In [0]:
df_union.select("IDADE").distinct().display()
df_union.select("UNIDADE_IDADE").distinct().display()

## Limpeza e tratamento de dados

In [0]:
df_union.count()

In [0]:
df_union = df_union.dropna(how="all")
#df_cleaned = df.dropna(how="all")

In [0]:
df_union.count()

In [0]:
df_copia = df_union

In [0]:
df_copia =  df_copia.fillna(-1, ['QTD_VENDIDA','UNIDADE_IDADE', 'IDADE', 'SEXO', 'TIPO_RECEITUARIO'])
df_copia =  df_copia.fillna("N/A", ['ANO_VENDA','MES_VENDA' , 'UF_VENDA', 'MUNICIPIO_VENDA', 'PRINCIPIO_ATIVO', 'DESCRICAO_APRESENTACAO', 'CONSELHO_PRESCRITOR', 'UF_CONSELHO_PRESCRITOR', 'CID10'])


In [0]:
df_copia.filter((col("UF_VENDA") == "SP") & (col("MUNICIPIO_VENDA") == "SÃO PAULO")).display()

##Unificando dataframe das vendas com o de municipios

In [0]:
df_municipios = spark.sql("SELECT * FROM controlled_medication_data.bronze_layer.municipios")

In [0]:


#deixando todas as colunas em lower case para realizar o join
df_vendas_padronizado = df_copia.withColumn("MUNICIPIO_VENDA_LOWER", lower(col("MUNICIPIO_VENDA")))
df_municipios_padronizado = df_municipios.withColumn("nome_municipio_lower", lower(col("nome_municipio")))

#Unindo dois dataframes aonde os campos uf_venda == uf e municipio_venda == nome_municipio
df_vendas_com_municipio = df_vendas_padronizado.join(
    df_municipios_padronizado,
    on=(
        (col("UF_VENDA") == col("uf")) &
        (col("MUNICIPIO_VENDA_LOWER") == col("nome_municipio_lower"))
    ),
    how="left" 
)

df_vendas_com_municipio.display()

In [0]:
df_vendas_com_municipio.count()

##Tratando schema da tabela, dropando colunas e arrumando os nomes das tabelas para o padrao snake_case

In [0]:
df_vendas_com_municipio = df_vendas_com_municipio.drop("ddd", "fuso_horario", "MUNICIPIO_VENDA_LOWER", "nome_municipio_lower", "nome_municipio", "uf")

In [0]:
df_vendas_com_municipio = df_vendas_com_municipio.withColumnRenamed("UF_VENDA", "uf_venda") \
    .withColumnRenamed("ANO_VENDA", "ano_venda") \
    .withColumnRenamed("MES_VENDA", "mes_venda") \
    .withColumnRenamed("PRINCIPIO_ATIVO", "principio_ativo") \
    .withColumnRenamed("DESCRICAO_APRESENTACAO", "descricao_apresentacao") \
    .withColumnRenamed("QTD_VENDIDA", "qtd_vendida") \
    .withColumnRenamed("UNIDADE_MEDIDA", "unidade_medida") \
    .withColumnRenamed("CONSELHO_PRESCRITOR", "conselho_prescritor") \
    .withColumnRenamed("UF_CONSELHO_PRESCRITOR", "uf_conselho_prescritor") \
    .withColumnRenamed("SEXO", "sexo_comprador") \
    .withColumnRenamed("IDADE", "idade_comprador") \
    .withColumnRenamed("UNIDADE_IDADE", "unidade_idade_comprador") \
    .withColumnRenamed("TIPO_RECEITUARIO", "tipo_receituario") \
    .withColumnRenamed("MUNICIPIO_VENDA", "municipio_venda")


In [0]:
df_vendas_com_municipio.display()

## Transformação das idades de compradores para anos e transformação para tipo inteiro da coluna "idade_comprador"

In [0]:
df_vendas = df_vendas_com_municipio

###transformação de campos com idade em meses para anos

In [0]:
df_vendas = df_vendas.withColumn(
    "idade_comprador",
    when(col("unidade_idade_comprador") == 2, col("idade_comprador")/12)
    .otherwise(col("idade_comprador"))
)

### correção de campos com o valor menor que 1 para 1

In [0]:
df_vendas = df_vendas.withColumn(
    "idade_comprador", 
    when((col("idade_comprador") < 1) & (col("idade_comprador") != -1), 1)
    .otherwise(col("idade_comprador"))
)

### Transformando tipo da coluna para int

In [0]:
df_vendas = df_vendas.withColumn("idade_comprador", col("idade_comprador").cast("int"))

df_vendas.filter(col("unidade_idade_comprador") == -1) \
                        .orderBy(desc("idade_comprador")) \
                        .display()

In [0]:
df_vendas = df_vendas.drop("unidade_idade_comprador")

In [0]:
df_vendas = df_vendas.withColumn(
    "data_venda",
    to_date(
        concat(
            col("ano_venda"),
            lit("-"),
            lpad(col("mes_venda"), 2, "0"),  # Preenche o mês com 0s à esquerda
            lit("-"),
            lpad(lit("1"), 2, "0")  # Preenche o dia com 0s à esquerda
        ),
        "yyyy-MM-dd"
    )
)

df_vendas.display()


df_vendas.printSchema()

## Reordenando as colunas da tabela

##Criação de schema caso ele não exista

In [0]:
%sql
USE CATALOG controlled_medication_data;
CREATE SCHEMA IF NOT EXISTS silver_layer;

## Salvando tabela na silver layer

In [0]:
df_vendas.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("controlled_medication_data.silver_layer.vendas")