<a href="https://colab.research.google.com/github/carolinajacoby/farmacovigilanciaSUS/blob/main/MestradoSparkNotificacoes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1- Configuração do ambiente de programação



In [1]:
import os

In [2]:
# Instalar e testar versão do Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
! java --version

openjdk 11.0.25 2024-10-15
OpenJDK Runtime Environment (build 11.0.25+9-post-Ubuntu-1ubuntu122.04)
OpenJDK 64-Bit Server VM (build 11.0.25+9-post-Ubuntu-1ubuntu122.04, mixed mode, sharing)


In [3]:
# Baixar o Apache Spark
! wget https://dlcdn.apache.org/spark/spark-3.4.4/spark-3.4.4-bin-hadoop3.tgz
# Descompactar
! tar xf spark-3.4.4-bin-hadoop3.tgz
# Configurar o Apache Spark
! pip install -q findspark


--2025-01-07 19:38:06--  https://dlcdn.apache.org/spark/spark-3.4.4/spark-3.4.4-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 388988563 (371M) [application/x-gzip]
Saving to: ‘spark-3.4.4-bin-hadoop3.tgz’


2025-01-07 19:38:13 (50.6 MB/s) - ‘spark-3.4.4-bin-hadoop3.tgz’ saved [388988563/388988563]



In [4]:
#Apagar o arquivo compactado
! rm -rf spark-3.4.4-bin-hadoop3.tgz

In [5]:
#Configuração as variáveis de ambiente
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.4.4-bin-hadoop3'


In [6]:
! pip install -q findspark # pacote para procurar o spark na máquina e informar ao os onde encontra-lo

In [7]:
# Inicializar o Spark e criar uma sessão
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName ('MestradoSpark'). getOrCreate()

# 2- Leitura do CSV vigimed_notificacoes

In [8]:
df_notificacoes = spark.read.csv(
    '/content/drive/MyDrive/mestrado_carol/dissertacao/codigo/vigimed/vigimed_notificacoes.csv',
    sep=';',
    header=True,
    inferSchema=True,
    encoding='ISO-8859-1'
)


In [9]:
df_notificacoes.describe().show(20)

+-------+------+--------------------+--------------------+-------------------------+---------------------+-----------------------+-------------------+--------------------+------------------------+--------------------+--------------------+-----------+--------------------------------+------------+--------+--------+-----------------+------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------------+------------------------+--------------------+-----------+
|summary|    UF|TIPO_ENTRADA_VIGIMED|         RECEBIDO_DE|IDENTIFICACAO_NOTIFICACAO|DATA_INCLUSAO_SISTEMA|DATA_ULTIMA_ATUALIZACAO|   DATA_NOTIFICACAO|    TIPO_NOTIFICACAO|NOTIFICACAO_PARENT_CHILD|     DATA_NASCIMENTO|IDADE_MOMENTO_REACAO|GRUPO_IDADE|IDADE_GESTACIONAL_MOMENTO_REACAO|        SEXO|GESTANTE|LACTANTE|          PESO_KG|         ALTURA_CM|REACAO_EVENTO_ADVERSO_MEDDRA|               GRAVE|          

In [10]:
from pyspark.sql.functions import col

# Inicializar uma lista para armazenar os resultados
resultados = []

# Obter o esquema das colunas
for field in df_notificacoes.schema.fields:
    # Nome e tipo da coluna
    nome_coluna = field.name
    tipo_coluna = field.dataType.simpleString()

    # Contar o número de registros não nulos (count)
    count = df_notificacoes.filter(col(nome_coluna).isNotNull()).count()

    # Obter estatísticas descritivas, como min, max, etc.
    stats = df_notificacoes.describe(nome_coluna).collect()
    stats_dict = {row['summary']: row[nome_coluna] for row in stats}

    # Adicionar ao resultado
    resultados.append({
        "Coluna": nome_coluna,
        "Tipo": tipo_coluna,
        "Quantidade de Linhas": count,
        "Mínimo": stats_dict.get("min"),
        "Máximo": stats_dict.get("max"),
        "Média": stats_dict.get("mean"),
        "Desvio Padrão": stats_dict.get("stddev")
    })

# Converter resultados em um DataFrame para visualização
resultado_df = spark.createDataFrame(resultados)

# Exibir a tabela
resultado_df.show(truncate=False)


+--------------------------------+------------------+--------------------------------------------------------------------------------+--------------------+----------------------------------------------+--------------------+------+
|Coluna                          |Desvio Padrão     |Máximo                                                                          |Média               |Mínimo                                        |Quantidade de Linhas|Tipo  |
+--------------------------------+------------------+--------------------------------------------------------------------------------+--------------------+----------------------------------------------+--------------------+------+
|UF                              |null              |TO                                                                              |null                | C                                            |164199              |string|
|TIPO_ENTRADA_VIGIMED            |null              |VigiFlow eForms        

# 3 - Pré processamento de dados

REMOÇÃO DE RUIDOS


*   ACENTOS
*   BR-ANVISA
* _x000D_


In [11]:
from pyspark.sql.functions import regexp_replace, col, upper, to_date

# Função para remover acentos
def remover_acentos(df, colunas):
    for coluna in colunas:
        df = df.withColumn(coluna, regexp_replace(col(coluna), "[áàãâä]", "A"))
        df = df.withColumn(coluna, regexp_replace(col(coluna), "[éèêë]", "E"))
        df = df.withColumn(coluna, regexp_replace(col(coluna), "[íìîï]", "I"))
        df = df.withColumn(coluna, regexp_replace(col(coluna), "[óòõôö]", "O"))
        df = df.withColumn(coluna, regexp_replace(col(coluna), "[úùûü]", "U"))
        df = df.withColumn(coluna, regexp_replace(col(coluna), "[ç]", "C"))
        df = df.withColumn(coluna, regexp_replace(col(coluna), "[ñ]", "N"))
    return df



In [12]:
# Identificar as colunas de texto
colunas_texto = [col for col, dtype in df_notificacoes.dtypes if dtype == "string"]

# Remover "BR-ANVISA-", "_x000D_", e acentos
for coluna in colunas_texto:
    df_notificacoes = df_notificacoes.withColumn(coluna, regexp_replace(col(coluna), "BR-ANVISA-", ""))
    df_notificacoes = df_notificacoes.withColumn(coluna, regexp_replace(col(coluna), "_x000D_", ""))
df_notificacoes = remover_acentos(df_notificacoes, colunas_texto)




TRANSFORMAÇÃO DE TODOS REGISTROS PARA MAIÚSCULA PARA PADRONIZAR

In [13]:
# Transformar todos os registros para maiúsculas
for coluna in colunas_texto:
    df_notificacoes = df_notificacoes.withColumn(coluna, upper(col(coluna)))


REMOÇÃO DE ESPAÇOS EM BRANCO

In [14]:
from pyspark.sql.functions import trim

# Remover espaços em branco de colunas de texto
colunas_texto = [
    "UF", "TIPO_ENTRADA_VIGIMED", "RECEBIDO_DE", "TIPO_NOTIFICACAO",
    "NOTIFICACAO_PARENT_CHILD", "IDADE_MOMENTO_REACAO", "GRUPO_IDADE",
    "IDADE_GESTACIONAL_MOMENTO_REACAO", "REACAO_EVENTO_ADVERSO_MEDDRA",
    "GRAVIDADE", "DESFECHO", "DURACAO", "RELACAO_MEDICAMENTO_EVENTO",
    "NOME_MEDICAMENTO_WHODRUG", "ACAO_ADOTADA", "NOTIFICADOR"
]

# Aplicar trim para remover espaços em branco no início e no fim
for coluna in colunas_texto:
    df_notificacoes = df_notificacoes.withColumn(coluna, trim(col(coluna)))


TRATAMENTO DE VALORES AUSENTES

In [15]:
from pyspark.sql.functions import col, count, when

# Número total de linhas no DataFrame
total_rows = df_notificacoes.count()

# Contar valores nulos e calcular o percentual de nulos em cada coluna
missing_percentage = df_notificacoes.select(
    [
        ((count(when(col(c).isNull(), c)) / total_rows) * 100).alias(c)
        for c in df_notificacoes.columns
    ]
)

# Exibir os resultados
missing_percentage.show()


+-----------------+--------------------+------------------+-------------------------+---------------------+-----------------------+------------------+----------------+------------------------+-----------------+--------------------+-----------------+--------------------------------+---------------+-----------------+-----------------+-----------------+----------------+----------------------------+------------------+------------------+------------------+------------------+-----------------+------------------+--------------------------+------------------------+-----------------+-----------------+
|               UF|TIPO_ENTRADA_VIGIMED|       RECEBIDO_DE|IDENTIFICACAO_NOTIFICACAO|DATA_INCLUSAO_SISTEMA|DATA_ULTIMA_ATUALIZACAO|  DATA_NOTIFICACAO|TIPO_NOTIFICACAO|NOTIFICACAO_PARENT_CHILD|  DATA_NASCIMENTO|IDADE_MOMENTO_REACAO|      GRUPO_IDADE|IDADE_GESTACIONAL_MOMENTO_REACAO|           SEXO|         GESTANTE|         LACTANTE|          PESO_KG|       ALTURA_CM|REACAO_EVENTO_ADVERSO_MEDDRA|  

TRATAMENTO PARA COLUNAS COM + 75% DE VALORES AUSENTES:
SERÁ FEITO A EXCLUSÃO.

In [16]:
from pyspark.sql.functions import col, count, when

# Definir o limite de porcentagem (por exemplo, 50%)
threshold = 75.0

# Número total de linhas no DataFrame
total_rows = df_notificacoes.count()

# Calcular o percentual de valores nulos em cada coluna
columns_to_remove = []
for column in df_notificacoes.columns:
    null_count = df_notificacoes.filter(col(column).isNull()).count()
    null_percentage = (null_count / total_rows) * 100
    if null_percentage > threshold:
        columns_to_remove.append(column)

# Excluir as colunas identificadas
df_notificacoes = df_notificacoes.drop(*columns_to_remove)

# Exibir as colunas removidas
print(f"Colunas removidas: {columns_to_remove}")



Colunas removidas: ['NOTIFICACAO_PARENT_CHILD', 'ALTURA_CM']
