# Juntando as Peças 🧩

## Raspagem de Dados ✂️

### Características dos Estados

<pre>Vamos baixar os dados das características dos estados para o nosso sistema de arquivos... nós os utilizaremos no decorrer das aulas!</pre>

👉 dados de <a href='https://www.ibge.gov.br/cidades-e-estados/'>características dos estados brasileiros</a>

#### Funções Auxiliares

In [1]:
import os, json, requests
from bs4 import BeautifulSoup
import requests, urllib3, ssl

def build_path(subfolder = 'raw'):
    folderpath = os.path.join(os.getcwd(), os.pardir, 
                              'project', 'data', subfolder)
    folderpath = os.path.abspath(folderpath)
    if not os.path.exists(folderpath): os.makedirs(folderpath)
    return folderpath

"""
workaround para contornar a exceção: \
    SSLError: [SSL: UNSAFE_LEGACY_RENEGOTIATION_DISABLED] \
    unsafe legacy renegotiation disabled (_ssl.c:997)

é possível considerando que estamos tratando apenas de dados públicos
"""

class CustomHttpAdapter (requests.adapters.HTTPAdapter):
    def __init__(self, ssl_context=None, **kwargs):
        self.ssl_context = ssl_context
        super().__init__(**kwargs)
    def init_poolmanager(self, connections, maxsize, block=False):
        self.poolmanager = urllib3.poolmanager.PoolManager(
            num_pools=connections, maxsize=maxsize,
            block=block, ssl_context=self.ssl_context)

def get_legacy_session():
    ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
    ctx.options |= 0x4  # OP_LEGACY_SERVER_CONNECT
    session = requests.session()
    session.mount('https://', CustomHttpAdapter(ctx))
    return session

def scrape_from_internet(url, lower_state):
    URL_STATE = f'{url}/{lower_state}.html'
    try:
        response = requests.get(f'{URL_STATE}')
    except requests.exceptions.SSLError:
        response = get_legacy_session().get(f'{URL_STATE}')
    return response.content

def html_indicators(content):
    indicators_soup = BeautifulSoup(content, 'html.parser')
    all_indicators = indicators_soup.find_all('div', 
                                              class_='indicador')
    return all_indicators

def parse_indicator(indicator):
    label = indicator.find('div', class_='ind-label')
    value = indicator.find('p', class_='ind-value').get_text(strip=False)
    measure = indicator.find('span', class_='indicador-unidade')
    measure = measure.get_text(strip=True) if measure else '\xa0'

    return {'label': label.get_text(strip=True), \
            'value': [x.strip() for x in value.split(measure)][0], \
            'measure': measure.strip()}

def parse_indicators(all_indicators):
    list_of_indicators = []

    for indicator in all_indicators:
        list_of_indicators.append(parse_indicator(indicator))
        
    return list_of_indicators

def parse_page(url, lower_state):
    content = scrape_from_internet(url, lower_state)
    all_indicators = html_indicators(content)
    return parse_indicators(all_indicators)

def access_pages_and_save(url, input_filename, output_filename):

    folderpath = build_path()
    inputpath = os.path.join(folderpath, input_filename)

    lower_states = []
    with open(inputpath) as jsonfile:
        json_states = json.load(jsonfile)
        for json_state in json_states:
            lower_state = json_state['sigla'].lower()
            lower_states.append(lower_state)        

    json_states_characteristics = []
    for lower_state in lower_states:
        json_states_characteristics.append(\
            {'state': lower_state,\
             'characteristics': parse_page(url, lower_state)})

    outputpath = os.path.join(folderpath, output_filename)
    with open(outputpath, mode='w') as jsonfile:
        json.dump(json_states_characteristics, jsonfile)

#### Código

In [2]:
access_pages_and_save(url = 'https://ibge.gov.br/cidades-e-estados',
                      input_filename = 'estados_codigos.json',
                      output_filename = 'estados_caracteristicas.json')

### Dados de casos e óbitos por covid-19

<pre>Vamos baixar os dados de casos e óbitos por covid-19 no Brasil para o nosso sistema de arquivos... nós os utilizaremos no decorrer das aulas!</pre>

👉 dados de <a href='https://covid.saude.gov.br/'>casos e óbitos por covid-19 no Brasil</a>

#### Funções Auxiliares

In [3]:
!pip install --upgrade selenium --quiet


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m23.2.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m


In [4]:
import os, time, csv
from zipfile import ZipFile
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By

In [5]:
def file_exists(filename_start = 'HIST_PAINEL_COVID'):
    folderpath = build_path('tmp')
    compressed_filepath = None

    for file in os.listdir(folderpath):
        if file.startswith(filename_start) and file.endswith('.zip'):
            compressed_filepath = os.path.join(folderpath, file)
    
    return compressed_filepath

def open_browser_and_save(url = 'https://covid.saude.gov.br/',
                          filename_start = 'HIST_PAINEL_COVID'):
    folderpath = build_path('tmp')

    #trecho incluído para limpar a pasta tmp
    for file in os.listdir(folderpath):
        if file.startswith('HIST_PAINEL_COVID'):
            filepath = os.path.join(folderpath, file)
            os.remove(filepath)
    
    options = Options()
    prefs = {'download.default_directory' : folderpath}
    options.add_experimental_option('prefs', prefs)
    driver = webdriver.Chrome(options=options)
    driver.get(url)

    button = driver.find_element(By.XPATH,
                                 "//*[contains(text(), 'Arquivo CSV')]")
    time.sleep(2)
    button.click()

    #código aperfeiçoado para fechar a janela no fim do download
    compressed_filepath = None
    while compressed_filepath is None:
        time.sleep(1)
        compressed_filepath = file_exists()

    driver.close()
    
    return compressed_filepath

def extract_folder(compressed_filepath, 
                   filename_start = 'HIST_PAINEL_COVID'):
    compressed_folderpath = os.path.split(compressed_filepath)[0]
    
    if compressed_filepath.endswith('.zip'):
        with ZipFile(compressed_filepath, 'r') as file:
            file.extractall(path=compressed_folderpath)
            file.close()

    os.remove(compressed_filepath)

def concatenate_datasets(filename_start = 'HIST_PAINEL_COVID'):
    folderpath = build_path('tmp')

    header, rows = None, []
    for file in os.listdir(folderpath):
        if file.startswith(filename_start) and file.endswith('.csv'):
            filepath = os.path.join(folderpath, file)
            with open(filepath, encoding='utf-8') as csvfile:
                csvreader = csv.DictReader(csvfile, delimiter=';')
                header = csvreader.fieldnames
                for row in csvreader: rows.append(row)
            #os.remove(filepath)

    folderpath = build_path('raw')
    filepath = os.path.join(folderpath, 'ALL_' + filename_start + '.csv')
    with open(filepath, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=header, delimiter=';')
        writer.writeheader()
        for row in rows: writer.writerow(row)

#### Código

In [6]:
compressed_filepath = open_browser_and_save()
extract_folder(compressed_filepath)
concatenate_datasets()

## API 📝

### Dados de Vacinação contra covid-19

<pre>Vamos baixar os dados de vacinação para covid-19 para o nosso sistema de arquivos... nós os utilizaremos no decorrer das aulas!</pre>

👉 dados de <a href='https://servicos-datasus.saude.gov.br/detalhe/CddynnsgE2'>imunização contra covid-19 no Brasil</a>

#### Função Auxiliar

In [7]:
!pip install --upgrade elasticsearch --quiet


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m23.2.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m


In [8]:
import os, json
from elasticsearch import Elasticsearch

In [9]:
def access_elasticsearch_and_save(url, filename):
    folderpath = build_path()
    filepath = os.path.join(folderpath, filename)
    
    es = Elasticsearch([{'scheme': 'https', 'host': url, 'port': 443}], \
                       basic_auth=('imunizacao_public', 
                                   'qlto5t&7r_@+#Tlstigi'),
                       request_timeout=60)

    #apenas cidades acima de 300k habitantes estão no índice
    #quando consulta mais de 83 municípios, 
    #a API apresenta a exceção "ApiError"
    
    aggs = {
        'Paciente_Estado' if 'estado' in filename else 'Paciente_Municipio': {
            'terms': {'field': 'paciente_endereco_uf' if 'estado' in filename else 'paciente_endereco_coIbgeMunicipio', 
                      'min_doc_count': 1, 
                      'order': {'_count': 'desc'},
                      'size': 27 if 'estados' in filename else 60},
            'aggs': {
                'Data_Aplicacao_Vacina': {
                    'date_histogram': {'field': 'vacina_dataAplicacao', 
                                       'calendar_interval': 'day', 
                                       'min_doc_count': 1, 
                                       'order': {'_key': 'desc'}},
                }
            }
        }
    }

    res = es.search(size = 0, aggs = aggs)

    with open(filepath, mode='w') as jsonfile:
        json.dump(res['aggregations'], jsonfile)

#### Código

In [10]:
access_elasticsearch_and_save('imunizacao-es.saude.gov.br', 
                              'estados_vacinacao.json')

In [11]:
access_elasticsearch_and_save('imunizacao-es.saude.gov.br', 
                              'municipios_vacinacao.json')