In [6]:
# importando bibliotecas

In [7]:
import os
import string
import re
import unidecode

In [8]:
from core import spark, F, T

In [9]:
# configurando pasta de input
# isso é firula pra nao ultrapassar o limite da pep (79)

In [10]:
datalake = '/media/fabio/19940C2755DB566F/PAMepi/datalake/'
raw_version = 'raw_data_covid19_version-2022-01-25'
folder_sg = 'data-notificacao_sindrome_gripal/'

sg_input = os.path.join(datalake, raw_version, folder_sg)

In [11]:
raw = spark.read.csv('/home/fabio/Downloads/sindrome_gripal/', header=True)

                                                                                

In [12]:
raw.show(1, vertical=True, truncate=False)

-RECORD 0------------------------------
 id                       | 8OQItgBNg6 
 estadoNotificacao        | São Paulo  
 estadoNotificacaoIBGE    | null       
 municipioNotificacao     | São Paulo  
 municipioNotificacaoIBGE | null       
 profissionalSaude        | null       
 profissionalSeguranca    | null       
 cbo                      | null       
 sexo                     | null       
 racaCor                  | null       
 estado                   | null       
 estadoIBGE               | null       
 municipio                | null       
 municipioIBGE            | null       
 dataNotificacao          | null       
 sintomas                 | null       
 outrosSintomas           | null       
 dataInicioSintomas       | null       
 condicoes                | null       
 estadoTeste              | null       
 dataTeste                | null       
 tipoTeste                | null       
 resultadoTeste           | null       
 classificacaoFinal       | null       


In [13]:
def remove_special_character(row):
    try:
        chars = re.escape(string.punctuation)
        row_sub = re.sub(r'['+chars+']', ' ', row)
        row_uni = unidecode.unidecode(row_sub).lower()
        row_split = row_uni.split(' ')
        for s in range(len(row_split)):
            try:
                row_split.remove('')
            except ValueError:
                pass
        return ' '.join(row_split)
    except TypeError:
        return None

In [14]:
remove_special_character_udf = F.udf(remove_special_character, T.StringType())

In [15]:
integer_columns = [
    'estadoNotificacaoIBGE', 'municipioNotificacaoIBGE',
    'municipioIBGE', 'cnes', 'idade',
]

string_columns = [
    'id', 'cbo', 'estadoNotificacao', 'municipioNotificacao',
    'profissionalSaude', 'profissionalSeguranca', 'sexo', 'racaCor', 
    'sintomas', 'outrosSintomas', 'condicoes', 'estadoTeste', 'tipoTeste',
    'resultadoTeste', 'classificacaoFinal', 'evolucaoCaso','estadoIBGE',
]

date_columns = [
    'dataNotificacao', 'dataInicioSintomas', 'dataTeste', 'dataEncerramento',
]

In [16]:
# apenas confirmados e que não tenham sido cancelado

In [17]:
cases_conf = raw.filter((raw['classificacaoFinal'].startswith('Conf')) &
                        (raw['evolucaoCaso'] != 'Cancelado'))

In [18]:
for col in string_columns:
    cases_conf = cases_conf.withColumn(col, remove_special_character_udf(col))

In [19]:
for col in integer_columns:
    cases_conf = cases_conf.withColumn(col, F.col(col).cast(T.IntegerType()))

In [20]:
for col in date_columns:
    cases_conf = cases_conf.withColumn(col, F.col(col).cast(T.DateType()))

In [21]:
cases_conf = cases_conf.withColumn(
    'sexo', F.when(F.col('sexo').startswith('f'), 1) \
    .when(F.col('sexo').startswith('m'), 2) \
    .when(F.col('sexo').startswith('i'), 9) \
    .otherwise(F.lit(None)))

In [22]:
cases_conf = cases_conf.withColumn(
    'racaCor', F.when(F.col('racaCor').startswith('ignorado'), 9) \
    .when(F.col('racaCor').startswith('pret'), 1) \
    .when(F.col('racaCor').startswith('indigena'), 2) \
    .when(F.col('racaCor').startswith('amarel'), 3) \
    .when(F.col('racaCor').startswith('branc'), 4) \
    .when(F.col('racaCor').startswith('pard'), 5) \
    .otherwise(F.lit(None)))

In [23]:
cases_conf = cases_conf.withColumn(
    'tipoTeste', F.when(F.col('tipoTeste') == 'quimioluminescencia clia', 1) \
    .when(F.col('tipoTeste') == 'teste rapido antigeno', 2) \
    .when(F.col('tipoTeste') \
          .startswith('imunoensaio por eletroquimioluminescencia'), 3) \
    .when(F.col('tipoTeste').startswith('enzimaimunoensaio'), 4) \
    .when(F.col('tipoTeste') == 'teste rapido anticorpo', 5) \
    .when(F.col('tipoTeste') == 'rt pcr', 6) \
    .otherwise(F.col('tipoTeste')))

In [24]:
cases_conf = cases_conf.withColumn(
    'evolucaoCaso', F.when(F.col('evolucaoCaso') == 'ignorado', 9) \
    .when(F.col('evolucaoCaso') == 'internado em uti', 2) \
    .when(F.col('evolucaoCaso') == 'internado', 3) \
    .when(F.col('evolucaoCaso') == 'em tratamento domiciliar', 4) \
    .when(F.col('evolucaoCaso') == 'cura', 5) \
    .when(F.col('evolucaoCaso') == 'obito', 6) \
    .otherwise(F.col('evolucaoCaso')))

In [25]:
cases_conf = cases_conf.withColumn(
    'classificacaoFinal', F.when(F.col('classificacaoFinal') \
                                 .endswith('confirmado'), 1) \
    .when(F.col('classificacaoFinal').endswith('clinico'), 2) \
    .when(F.col('classificacaoFinal').endswith('imagem'), 3) \
    .when(F.col('classificacaoFinal').endswith('epidemiologico'), 4) \
    .when(F.col('classificacaoFinal').endswith('laboratorial'), 5) \
    .otherwise(F.col('classificacaoFinal')))

In [26]:
cases_conf = cases_conf.withColumn(
    'resultadoTeste', F.when(F.col('resultadoTeste') == 'positivo', 1) \
    .when(F.col('resultadoTeste') == 'negativo', 2) \
    .when(F.col('resultadoTeste') == 'inconclusivo ou indeterminado', 3) \
    .otherwise(F.col('resultadoTeste')))

In [27]:
# escrevendo apenas casos confirmados 

In [28]:
cases_conf.write.csv('sindrome_gripal_conf_', mode='overwrite', header=True)

                                                                                

In [29]:
# lendo apenas casos confirmados

In [30]:
raw = spark.read.csv('sindrome_gripal_conf_', header=True)

In [31]:
# alguns ajustes para Felipe

In [32]:
raw = raw.withColumn(
    'dataNotificacao', F.when(
        (F.col('dataNotificacao').isNull()) &
        (F.col('dataInicioSintomas') >= '2020-03-01') &
        (F.col('dataInicioSintomas') <= '2022-01-25'),
        F.col('dataInicioSintomas')) \
    
    .when(
        ((F.col('dataNotificacao') < '2020-03-01') |
        (F.col('dataNotificacao') > '2022-01-25')) &
        (F.col('dataInicioSintomas') >= '2020-03-01') &
        (F.col('dataInicioSintomas') <= '2022-01-25'),
        F.col('dataInicioSintomas')) \
    
    .when(
        ((F.col('dataNotificacao').isNull()) |
        (F.col('dataNotificacao') < '2020-03-01') |
        (F.col('dataNotificacao') > '2022-01-25')) &
        (F.col('dataTeste') >= '2020-03-01') &
        (F.col('dataTeste') <= '2022-01-25'),
        F.col('dataTeste') \
        
    ).otherwise(F.col('dataNotificacao')))

In [33]:
raw = raw.withColumn(
    'dataInicioSintomas', F.when(
        (F.col('dataInicioSintomas').isNull()) &
        (F.col('dataNotificacao') >= '2020-02-23') &
        (F.col('dataNotificacao') <= '2022-01-25'),
        F.col('dataNotificacao')) \
    
    .when(
        ((F.col('dataInicioSintomas') < '2020-02-23') |
        (F.col('dataInicioSintomas') > '2022-01-25')) &
        (F.col('dataNotificacao') >= '2020-03-01') &
        (F.col('dataNotificacao') <= '2022-01-25'),
        F.col('dataNotificacao')) \
    
    .when(
        ((F.col('dataInicioSintomas').isNull()) |
        (F.col('dataInicioSintomas') < '2020-02-23') |
        (F.col('dataInicioSintomas') > '2022-01-25')) &
        (F.col('dataTeste') >= '2020-03-01') &
        (F.col('dataTeste') <= '2022-01-25'),
        F.col('dataTeste') \
        
    ).otherwise(F.col('dataInicioSintomas')))

In [34]:
columns = [
    'dataNotificacao', 'dataInicioSintomas', 'dataEncerramento', 'estadoIBGE',
    'estadoNotificacao', 'municipioIBGE', 'municipioNotificacao', 'sexo',
    'racaCor', 'idade', 'condicoes', 'evolucaoCaso'
]

In [35]:
raw.filter(raw['dataNotificacao'] <= '2021-11-15').select(columns) \
.coalesce(1).write.csv('sindrome_gripal_again', mode='overwrite', header=True)

                                                                                