# Script for the extraction of variables from SINAN Dengue database

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, StringType
from pyspark.sql.functions import year, month, col, sum, udf, substring, split, regexp_replace, when
import glob as gb
#from tqdm import tqdm
import re 
from functools import reduce

In [2]:
sc = SparkContext()
sc.stop()

conf = SparkConf().setAppName("SINAM_dengue_cases")

conf = (conf.setMaster("local[*]")
       .set("spark.executor.memory", "3GB")
       .set("spark.driver.memory", "20GB"))

sc = SparkContext(conf = conf)
spark = SparkSession(sc)
spark

In [20]:
#define paths to files
spark = SparkSession.builder.appName("Dengue").getOrCreate()

path = "/media/juliane_oliveira/My Passport/Datasus/Data_lake/Raw/Bancos_SINAN/DENGON/CSV_format/DENGN1"
path_to_export = "/media/juliane_oliveira/My Passport/Datalake/SINAN/Dengue/dengue_extractions/"


In [4]:
#Define functions used
#In the futere these functions will be moved to a library
#function to bind all dfs
def unionAll(dfs):
    return reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs)

def correcting_data(x):
    try:
        if len(x) == 7:
            return x[0:9] + 0 + x[-1]
        else:
            return x
    except:
        return None

In [10]:
#list with columns that will be used to aggregate data
cols_to_group = ["tp_not","id_agravo", "year_sin_pri", "month_sin_pri", "dt_notific",
                 "sem_not", 
                 "dt_sin_pri", "sem_pri" ,"classi_fin", "criterio", "sg_uf_not",
                 "id_municip", "cs_sexo", "cs_gestant", "cs_raca", "cs_escol_n",
                 "codmunres", "dt_nasc", "nu_idade_n", "comuninf"]

In [11]:
#Create a empty dictionary to recive data transformed
data_dict = {}

In [12]:
#Convert to udf all function that will be used
udf_correcting_data = udf(correcting_data, StringType())

In [13]:
for i in range(0, 9):
    
    #The first loop subset the files based on year
    
    year = "201" + str(i)
    print('{:*{align}{width}}'.format('Starting to process ' + year, align='^', width='112'))
    print('{:*{align}{width}}'.format('Reading files', align='^', width='112'))
    name = path + str(i) + "*"
    files = gb.glob(name)
      
    #Create a empty list to store results of transformations
    dfs = []
    
    #The second loop read all files belongs to a given year into a single data frame
    
    for base in files:
        dfs.append(spark.read.csv(base, header = True,  inferSchema = True))
    print("Number of files read: {}".format(len(dfs)))
    print('{:*{align}{width}}'.format('Processing files', align='^', width='112'))

    
    #The third loop standardize each file
    for j in range(0, len(dfs)):
        df = dfs[j]
        
        #Cleaning data variables (i.e year, month)
        df = df.withColumn("dt_sin_pri", df["dt_sin_pri"].cast(StringType()))
        df = df.withColumn("dt_nasc", df["dt_nasc"].cast(StringType()))
        df = df.withColumn("dt_notific", df["dt_notific"].cast(StringType()))
    
        
        df = df.withColumn("dt_nasc", udf_correcting_data(col("dt_nasc")))
        df = df.withColumn("dt_sin_pri", udf_correcting_data(col("dt_sin_pri")))
        df = df.withColumn("dt_notific", udf_correcting_data(col("dt_notific")))
        
        
        df = df.withColumn("month_sin_pri", substring(df["dt_sin_pri"], 6, 2))
        df = df.withColumn("year_sin_pri", substring(df["dt_sin_pri"], 1, 4))
        
        #Change variable names
        if "id_mn_resi" in df.columns:
            df = df.withColumnRenamed("id_mn_resi", "codmunres")
        
        #Group data according to variables used
        dfs[j] = df.select(cols_to_group).groupBy(cols_to_group).count()
        

    
    #Perform the union off files standardized
    dfs = unionAll(dfs)
    dfs.show(10, False)
    print('{:*{align}{width}}'.format('Biding files', align='^', width='112'))
    data_dict.update({year: dfs})
print('{:*{align}{width}}'.format('done', align='^', width='112'))

  

********************************************Starting to process 2010********************************************
*************************************************Reading files**************************************************
Number of files read: 1
************************************************Processing files************************************************
+------+---------+------------+-------------+----------+-------+----------+-------+----------+--------+---------+----------+-------+----------+-------+----------+---------+----------+----------+--------+-----+
|tp_not|id_agravo|year_sin_pri|month_sin_pri|dt_notific|sem_not|dt_sin_pri|sem_pri|classi_fin|criterio|sg_uf_not|id_municip|cs_sexo|cs_gestant|cs_raca|cs_escol_n|codmunres|dt_nasc   |nu_idade_n|comuninf|count|
+------+---------+------------+-------------+----------+-------+----------+-------+----------+--------+---------+----------+-------+----------+-------+----------+---------+----------+----------+--------+-----+
|2     

Number of files read: 2
************************************************Processing files************************************************
+------+---------+------------+-------------+-------------------+-------+----------+-------+----------+--------+---------+----------+-------+----------+-------+----------+---------+----------+----------+--------+-----+
|tp_not|id_agravo|year_sin_pri|month_sin_pri|dt_notific         |sem_not|dt_sin_pri|sem_pri|classi_fin|criterio|sg_uf_not|id_municip|cs_sexo|cs_gestant|cs_raca|cs_escol_n|codmunres|dt_nasc   |nu_idade_n|comuninf|count|
+------+---------+------------+-------------+-------------------+-------+----------+-------+----------+--------+---------+----------+-------+----------+-------+----------+---------+----------+----------+--------+-----+
|2     |A90      |2013        |01           |2013-01-04 00:00:00|201301 |2013-01-03|201301 |5         |1       |12       |120040    |F      |5         |1      |03        |120040   |1968-02-17|4044      |NA 

Number of files read: 1
************************************************Processing files************************************************
+------+---------+------------+-------------+-------------------+-------+----------+-------+----------+--------+---------+----------+-------+----------+-------+----------+---------+----------+----------+--------+-----+
|tp_not|id_agravo|year_sin_pri|month_sin_pri|dt_notific         |sem_not|dt_sin_pri|sem_pri|classi_fin|criterio|sg_uf_not|id_municip|cs_sexo|cs_gestant|cs_raca|cs_escol_n|codmunres|dt_nasc   |nu_idade_n|comuninf|count|
+------+---------+------------+-------------+-------------------+-------+----------+-------+----------+--------+---------+----------+-------+----------+-------+----------+---------+----------+----------+--------+-----+
|2     |A90      |2016        |02           |2016-02-22 00:00:00|201608 |2016-02-12|201606 |10        |1       |31       |310620    |F      |6         |2      |9         |310620   |1981-10-19|4034      |NA 

In [14]:
dfs = [data_dict["2010"], data_dict["2011"], data_dict["2012"], data_dict["2013"], 
       data_dict["2014"], data_dict["2015"], data_dict["2016"], data_dict["2017"], data_dict["2018"]]
dfs = unionAll(dfs)
dfs.show()

+------+---------+------------+-------------+----------+-------+----------+-------+----------+--------+---------+----------+-------+----------+-------+----------+---------+----------+----------+--------+-----+
|tp_not|id_agravo|year_sin_pri|month_sin_pri|dt_notific|sem_not|dt_sin_pri|sem_pri|classi_fin|criterio|sg_uf_not|id_municip|cs_sexo|cs_gestant|cs_raca|cs_escol_n|codmunres|   dt_nasc|nu_idade_n|comuninf|count|
+------+---------+------------+-------------+----------+-------+----------+-------+----------+--------+---------+----------+-------+----------+-------+----------+---------+----------+----------+--------+-----+
|     2|      A90|        2010|           01|2010-01-15| 201002|2010-01-15| 201002|         1|       2|       51|    510340|      M|         6|      1|        03|   510340|1995-11-27|      4014|  510340|    1|
|     2|      A90|        2010|           01|2010-01-19| 201003|2010-01-12| 201002|         1|       1|       35|    350280|      F|         5|      2|        0

In [16]:
dengue_conf = dfs.filter((dfs['classi_fin'] == 1) | (dfs['classi_fin'] == 10) | (dfs['classi_fin'] == 2) | (dfs['classi_fin'] == 11) | (dfs['classi_fin'] == 3) | (dfs['classi_fin'] == 4) | (dfs['classi_fin'] == 12))

In [18]:
dengue_conf.head(3)

[Row(tp_not='2', id_agravo='A90', year_sin_pri='2010', month_sin_pri='01', dt_notific='2010-01-15', sem_not='201002', dt_sin_pri='2010-01-15', sem_pri='201002', classi_fin='1', criterio='2', sg_uf_not='51', id_municip='510340', cs_sexo='M', cs_gestant='6', cs_raca='1', cs_escol_n='03', codmunres='510340', dt_nasc='1995-11-27', nu_idade_n='4014', comuninf='510340', count=1),
 Row(tp_not='2', id_agravo='A90', year_sin_pri='2010', month_sin_pri='01', dt_notific='2010-01-19', sem_not='201003', dt_sin_pri='2010-01-12', sem_pri='201002', classi_fin='1', criterio='1', sg_uf_not='35', id_municip='350280', cs_sexo='F', cs_gestant='5', cs_raca='2', cs_escol_n='03', codmunres='350280', dt_nasc='1970-04-17', nu_idade_n='4039', comuninf='350280', count=1),
 Row(tp_not='2', id_agravo='A90', year_sin_pri='2010', month_sin_pri='01', dt_notific='2010-01-10', sem_not='201002', dt_sin_pri='2010-01-08', sem_pri='201001', classi_fin='1', criterio='2', sg_uf_not='29', id_municip='292740', cs_sexo='M', cs_ge

In [22]:
dengue_conf.coalesce(1).write.format("csv").save("dengue_conf.csv")

In [23]:
dengue_desc = dfs.filter( dfs['classi_fin'] == 5)

In [25]:
dengue_desc.head(3)

[Row(tp_not='2', id_agravo='A90', year_sin_pri='2010', month_sin_pri='01', dt_notific='2010-01-25', sem_not='201004', dt_sin_pri='2010-01-24', sem_pri='201004', classi_fin='5', criterio='1', sg_uf_not='31', id_municip='310620', cs_sexo='F', cs_gestant='5', cs_raca='9', cs_escol_n='NA', codmunres='310620', dt_nasc='1965-04-12', nu_idade_n='4044', comuninf='NA', count=1),
 Row(tp_not='2', id_agravo='A90', year_sin_pri='2010', month_sin_pri='01', dt_notific='2010-01-19', sem_not='201003', dt_sin_pri='2010-01-17', sem_pri='201003', classi_fin='5', criterio='1', sg_uf_not='33', id_municip='330455', cs_sexo='M', cs_gestant='6', cs_raca='1', cs_escol_n='06', codmunres='330455', dt_nasc='1982-03-30', nu_idade_n='4027', comuninf='NA', count=1),
 Row(tp_not='2', id_agravo='A90', year_sin_pri='2010', month_sin_pri='01', dt_notific='2010-01-23', sem_not='201003', dt_sin_pri='2010-01-23', sem_pri='201003', classi_fin='5', criterio='1', sg_uf_not='26', id_municip='260410', cs_sexo='F', cs_gestant='9

In [26]:
dengue_desc.coalesce(1).write.format("csv").save("dengue_desc")

In [None]:
#dengue_conf.coalesce(1).write.csv(path_to_export, header = True)