In [1]:
import time
import asyncio  # executar as requisições de forma asíncrona
from typing import List
from typing import Any

import bs4
import httpx
import tqdm.notebook  # barra de progresso
import pandas as pd

from bs4 import BeautifulSoup
from unidecode import unidecode # remover acentos
from tqdm.asyncio import tqdm_asyncio  # barra de progresso

In [2]:
# Classe erro customizada
# Estou usando essa classe para debugar o código
# toda vez que surge um erro
class CustomError(Exception):
    def __init__(self, message: str, *args: object, obj: Any = None) -> None:
        self.message = message
        self.object = obj
        super().__init__(*args)
    def __str__(self) -> str:
        message  = f"{self.message}"
        if self.object:
            message += f"; Object: {self.object}"
        return message

## Versão atualizada

Esta versão coleta as páginas em porções (*chunks*).

A cada porção coletada, os dados são extraídos e armazenados.

In [5]:
async def fetch_links(client, links):
    """Requere as páginas e retorna as respostas"""
    tasks = []
    responses = []

    for link in links:
        tasks.append(asyncio.create_task(client.get(link)))

    # Aguardar todas as tarefas acabarem
    await tqdm_asyncio.gather(*tasks)

    responses = [task.result() for task in tasks]
    return responses

## Funções de extração

As funções da célula a seguir são responsáveis por extraís os dados de cada página requisitada.

Uma função para cada seção da página.

In [33]:
# Cada função desta célula é especializada em coletar
# dados de porções diferentes das páginas

# NOTE: apenas tabela_1 retorna DataFrame.
# As outras retornam uma lista de DataFrame

def tabela_1(tabela: bs4.element.ResultSet) -> pd.DataFrame:
    """Coleta dados do elemento de id="tabs-1" """

    columns = []
    data = []
    for table_row in tabela[0].select('table')[0].select('tr'):
        try:
            table_data = table_row.select('td')
            if table_data[0].has_attr('bgcolor'):
                # remover acentos e por em caixa alta
                column_name = unidecode(table_data[0].get_text(strip=True).upper())
                # remover espacos e dois pontos:
                column_name = column_name.replace(" ","_").replace(":", "")
                # usamos get_text(strip=True) para evitar hex encoding (tipo \xa0)
                columns.append(column_name)
                data.append(table_data[1].get_text(strip=True))
        except IndexError:
            continue
    return pd.DataFrame([data], columns=columns)

def tabela_2(tabela: bs4.element.ResultSet) -> List[pd.DataFrame]:
    """Coleta dados do elemento de id="tabs-2" """
    
    dataframes_list = []
    
    sub_tabelas = tabela[1].select('td')[1].select("table")
    for sub_tabela in sub_tabelas:
        rows = sub_tabela.select('tr')
        if "Entrada d'água:" in rows[0].text:
            table_data = rows[1].select('td')
            columns = [table_data[0].get_text(strip=True)]
            data = [table_data[1].get_text(strip=True)]
            try:
                dataframes_list.append(pd.DataFrame([data], columns=columns))
            except (AssertionError, ValueError):
                continue
        else:
            if len(rows) > 1:
                table_data = rows[0].select('td')
                if table_data:
                    suffix = table_data[0].get_text(strip=True)
                    columns = []
                    data = []
                    if len(rows) == 2:
                        for td in rows[1].select('td'):
                            if ':' in td.get_text(strip=True):
                                name, value = td.get_text(strip=True).split(':')
                                if name:
                                    column_name = unidecode(f"{suffix}_{name}".upper())
                                    column_name = column_name.replace(" ","_").replace(":", "")
                                    columns.append(column_name)
                                    data.append(value)
                    elif len(rows) == 3:
                        for table_data_columns, table_data_rows in zip(rows[1], rows[2]):
                            name = table_data_columns.get_text(strip=True)
                            value = table_data_rows.get_text(strip=True)
                            if name != '':
                                column_name = unidecode(f"{suffix}_{name}".upper())
                                column_name = column_name.replace(" ","_").replace(":", "")
                                columns.append(column_name)
                                data.append(value)
                    dataframes_list.append(pd.DataFrame([data], columns=columns))
    return dataframes_list

def tabela_3(tabela: bs4.element.ResultSet) -> List[pd.DataFrame]:
    """Coleta dados do elemento de id="tabs-3" """
    
    dataframes_list = []
    # os dados estão apenas no segundo element td
    table_data = tabela[2].select('tr')[0].select('td')[1]
    columns_elements = table_data.select('font')
    table_elements = table_data.select('table')
    for col_elem, table_elem in zip(columns_elements, table_elements):
        suffix = col_elem.get_text(strip=True)
        tr = table_elem.select('tr')
        if len(tr) == 1:
            td = tr[0].select('td')
            name = f"{suffix}_{td[0].get_text(strip=True)}"
            column = unidecode(name).replace(' ', '_').replace(':', '').upper()
            value = td[1].get_text(strip=True)
            dataframes_list.append(pd.DataFrame([value], columns=(column,)))
        else:
            names = tr[0].select('td')
            values = tr[1].select('td')
            columns = []
            data = []
            for name in names:
                name = f"{suffix}_{name.get_text(strip=True)}"
                name = name.replace(' ', '_').replace(':', '').upper()
                columns.append(unidecode(name))
            for value in values:
                value = f"{suffix}_{value.get_text(strip=True)}"
                data.append(value)
            dataframes_list.append(pd.DataFrame([data], columns=columns))
    return dataframes_list

def tabela_4(tabela: bs4.element.ResultSet) -> List[pd.DataFrame]:
    """Coleta dados do elemento de id="tabs-4" """
    
    dataframes_list = []
    # os dados estão apenas no segundo element td
    table_data = tabela[3].select('tr')[0].select('td')[1]
    columns_elements = table_data.select('font')
    table_elements = table_data.select('table')

    suffix = columns_elements[0].get_text(strip=True)

    # tem um dado em células deslocadas (Aquífero)
    offset_row = table_elements[0].select('tr')

    # célula da esquerda
    offset_column = offset_row[0].select('td')[0]
    column, data = offset_column.get_text(strip=True).split(':')
    data = data.strip()
    column = unidecode(f"{suffix}_{column}").replace(' ', '').upper()
    dataframes_list.append(pd.DataFrame([data], columns=(column,)))

    # a segunda célula desloada (à direita de Aquífero)
    columns = []
    data = []
    for row in offset_row[0].select('td')[1].select('tr'):
        column_element, data_element = row.select('td')
        
        column = f"{suffix}_{column_element.get_text(strip=True)}"
        column = unidecode(column).upper()
        columns.append(column)
        data.append(data_element.get_text(strip=True))

    dataframes_list.append(pd.DataFrame([data], columns=columns))

    # tabela abaixo de Aquífero
    table_rows = table_elements[1].select('tr')
    suffix = columns_elements[1].get_text(strip=True)
    columns = []
    data = []
    for table_row in table_rows:
        try:
            column, value = table_row.select('td')
        except ValueError as err:
            if len(table_row.select('td')) == 3:
                raise CustomError('pular') from err
            raise ValueError from err
        column = f"{suffix}_{column.get_text(strip=True)}"
        column = column.replace(' ', '_').replace(':', '').upper()
        columns.append(column)
        data.append(value.get_text(strip=True))

    dataframes_list.append(pd.DataFrame([data], columns=columns))

    return dataframes_list

def tabela_5(tabela: bs4.element.ResultSet) -> List[pd.DataFrame]:
    """Coleta dados do elemento de id="tabs-5" """
    
    dataframes_list = []
    
    # os dados estão apenas no segundo element td
    table_data = tabela[4].select('tr')[0].select('td')[1]
    suffix_element = table_data.select('font')
    table_elements = table_data.select('table')
    suffix = suffix_element[0].get_text(strip=True)

    table_rows = table_elements[0].select('tr')
    columns = []
    data = []
    # aqui acabei percebendo um jeito mais elegante de coletar
    # os valores nos elementos tr. Talvez dê pra o caso
    # das tabelas anteriores
    for table_row in table_rows[::2]:
        columns.extend([a.get_text(strip=True) for a in table_row.select('td')])
    for table_row in table_rows[1::2]:
        data.extend([a.get_text(strip=True) for a in table_row.select('td')])

    for idx, (col, value) in enumerate(zip(columns, data)):
        if col != '' and value != '':
            columns[idx] = f"{suffix}_{col}".replace(' ', '_').replace(':', '')
            columns[idx] = unidecode(columns[idx]).upper()

    dataframes_list.append(pd.DataFrame([data], columns=columns))
    
    return dataframes_list

def tabela_6(tabela: bs4.element.ResultSet) -> List[pd.DataFrame]:
    """Coleta dados do elemento de id="tabs-6" """

    dataframes_list = []

    # os dados estão apenas no segundo element td
    table_data = tabelas[5].select('tr')[0].select('td')[1]
    suffix_element = table_data.select('font')  # título da segunda subtabela
    table_elements = table_data.select('table')

    # 1a sub tabela
    suffix = ''
    for table_element in table_elements:
        columns = []
        data = []
        for idx, table_row in enumerate(table_element.select('tr')):
            if idx == 0:
                suffix = table_row.get_text(strip=True)
                continue
            table_data = table_row.select('td')
            unidade = ''
            if len(table_data) == 3:
                column, value, unidade = table_data
                unidade = unidade.get_text(strip=True).replace(' ', '_')
            else:
                column, value = table_data
            column = f"{suffix}_{column.get_text(strip=True)}".replace(' ', '_').replace(':', '')
            columns.append(unidecode(f"{column}_{unidade}").upper())
            data.append(value.get_text(strip=True))
            
        dataframes_list.append(pd.DataFrame([data], columns=columns))
    
    return dataframes_list

## Principal

A célula a seguir contém o loop principal.

Nesta versão, o programa primeiramente requere `chunk_size` páginas, e então extrai e salva os dados, 
para só então fazer outras novas `chunk_size` requisições, logo depois de pausar a execução por `sleep_time` segundos.

In [None]:
# carregando arquivo de links
FILENAME = 'Atlantico_Sul_N_NE.txt'
LINKS = []
with open(FILENAME, encoding='utf-8') as file_:
    LINKS = file_.read().split('\n')

######
# NOTE: O PROGRAMA PAROU EM ALGUM PONTO? AJUSTE AQUI!
######

start = 2540
end = len(links)  # len(links)  # use len(links) para ir até o último link

######

chunk_size = 10  # A cada `chunk_size` requisições a execução será
sleep_time = 1   # pausada por `sleep_time` segundos

error = None

async with httpx.AsyncClient() as client:
    for i in tqdm.notebook.tqdm(range(start, end, chunk_size)):
        chunk = LINKS[i:i + chunk_size]

        chunk_responses = await fetch_links(client, chunk)

        ocorrencias = []

        # os dados serão extraídos e salvos a cada pedaço
        # antes de realizar o próximo pedaço de requisições
        for response_idx, response in enumerate(chunk_responses):

            ocorrencia = []
            soup = BeautifulSoup(response.text, features='lxml')
            # todas as tabelas (elementos dentro de elementos de id="tabs-n")
            tabelas = soup.select('#newsContent')

            try:
                ocorrencia.append(tabela_1(tabelas))
                ocorrencia.extend(tabela_2(tabelas))
                ocorrencia.extend(tabela_3(tabelas))
                ocorrencia.extend(tabela_4(tabelas))
                ocorrencia.extend(tabela_5(tabelas))
                ocorrencia.extend(tabela_6(tabelas))
            except CustomError as err:
                if err.message == 'pular':
                    continue

            ocorrencias.append(pd.concat(ocorrencia, axis=1))

        # salva a cada chunk_size páginas (para liberar a memória)
        fname = f"samples/sample_{i}_to_{i+chunk_size}.parquet"
        # remove colunas duplicadas
        dfs = [df.loc[:, ~df.columns.duplicated()].reset_index() for df in ocorrencias]
        # merge o resultado
        amostra = pd.concat(dfs).iloc[:, 1:]
        amostra.to_parquet(fname, engine="pyarrow", index=False)

        time.sleep(sleep_time)


In [None]:
from pathlib import Path

amostra = None
for parquet_file in Path().glob('samples/*.parquet'):
    if amostra is None:
        amostra = pd.read_parquet(parquet_file)
        continue
    amostra = pd.concat([amostra, pd.read_parquet(parquet_file)])

# resetando os índices (eles ficam repetidos)
amostra = amostra.reset_index().iloc[:, 1:]
amostra.head(5)
