In [0]:
query = '''SELECT owner_username as autor, slug 
FROM silver.tabnews.assunto
WHERE body IS NULL
LIMIT 50'''
resposta = spark.sql(query).collect()

In [0]:
import aiohttp
import asyncio
from typing import Optional, Dict, Any, List, Tuple
import logging

# Configuração básica de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

async def buscar_conteudo_TabNews_async(session: aiohttp.ClientSession, autor: str, slug: str) -> Tuple[Optional[Dict[str, Any]], str]:
    """
    Busca o conteúdo do TabNews de forma assíncrona com tratamento de erros detalhado.

    Args:
        session: A sessão aiohttp para reutilização de conexão.
        autor: O autor do conteúdo.
        slug: O slug do conteúdo.

    Returns:
        Uma tupla contendo:
        - Um dicionário com os dados do conteúdo, ou None em caso de erro.
        - Uma string com a URL da requisição (para rastreamento e logs).
    """
    url = f'https://www.tabnews.com.br/api/v1/contents/{autor}/{slug}'
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resposta:
            #resposta.raise_for_status()  # Lança uma exceção para status HTTP de erro
            dados = await resposta.json()
            logging.info(f'Dados obtidos com sucesso para {url}')
            return dados, url
    except aiohttp.ClientError as erro:
        logging.error(f'Erro ao buscar dados da API do TabNews para {url}: {erro}')
        return None, url
    except asyncio.TimeoutError:
        logging.error(f'Timeout ao buscar dados da API do TabNews para {url}')
        return None, url
    except Exception as erro:
        logging.exception(f'Erro inesperado ao buscar dados da API do TabNews para {url}: {erro}')
        return None, url

async def buscar_conteudo_TabNews(autor: str, slug: str) -> Tuple[Optional[Dict[str, Any]], str]:
    """
    Função para buscar o conteúdo do TabNews, encapsulando a sessão aiohttp.

    Args:
        autor: O autor do conteúdo.
        slug: O slug do conteúdo.

    Returns:
        Uma tupla contendo:
        - Um dicionário com os dados do conteúdo, ou None em caso de erro.
        - Uma string com a URL da requisição.
    """
    async with aiohttp.ClientSession() as session:
        return await buscar_conteudo_TabNews_async(session, autor, slug)

# Função para processar uma única resposta de forma assíncrona
async def processar_resposta(r) -> Dict[str, Any]:
    """
    Processa uma resposta individual, buscando o conteúdo do TabNews e tratando possíveis erros.

    Args:
        r: Um objeto contendo os atributos 'autor' e 'slug'.

    Returns:
        Um dicionário contendo os dados da resposta, ou um dicionário de erro.
    """
    autor, slug = r.autor, r.slug
    dados, url = await buscar_conteudo_TabNews(autor, slug)
    if dados:
        return {'url': url, 'data': dados}
    else:
        return {'url': url, 'error': 'Falha ao obter dados'}

# Função principal que será chamada para processar todas as respostas
async def processar_todas_respostas(resposta: List[Any]) -> List[Dict[str, Any]]:
    """
    Processa todas as respostas de forma assíncrona, utilizando asyncio.gather para paralelização.

    Args:
        resposta: Uma lista de objetos, cada um contendo os atributos 'autor' e 'slug'.

    Returns:
        Uma lista de dicionários, cada um contendo os dados da resposta ou informações de erro.
    """
    tarefas = [processar_resposta(r) for r in resposta]
    resultados = await asyncio.gather(*tarefas, return_exceptions=True)  # Captura exceções individuais
    
    # Tratamento de possíveis exceções não capturadas dentro de processar_resposta
    resultados_tratados = []
    for resultado in resultados:
        if isinstance(resultado, Exception):
            logging.error(f'Exceção não tratada: {resultado}')
            resultados_tratados.append({'error': str(resultado)})
        else:
            resultados_tratados.append(resultado)
    
    return resultados_tratados

# Uso em um notebook Jupyter
async def main(resposta: List[Any]) -> List[Dict[str, Any]]:
    """
    Função principal para execução em ambiente Jupyter Notebook.

    Args:
        resposta: Uma lista de objetos, cada um contendo os atributos 'autor' e 'slug'.

    Returns:
        Uma lista de dicionários contendo os resultados do processamento.
    """
    return await processar_todas_respostas(resposta)

# Supondo que 'resposta' já existe e contém os dados
# Para executar, use:
resultados_assync = await main(resposta)
# (Em um ambiente Jupyter Notebook com suporte a asyncio)

In [0]:
import pandas as pd

def flatten_item(item):
    """
    Extrai informações relevantes de um item, movendo os dados aninhados para o nível superior.

    Args:
        item (dict): Um dicionário contendo a URL, dados (se existirem) e erros (se existirem).

    Returns:
        dict: Um dicionário "achatado" com a URL, dados extraídos e informações de erro.
    """
    flat = {'url': item['url']}  # Inicializa o dicionário flat com a URL do item.
    flat.update(item.get('data', {}))  # Adiciona os dados do item, se existirem, ao dicionário flat.
    flat['error'] = item.get('error')  # Adiciona informações de erro, se existirem, ao dicionário flat.
    return flat

# Cria um DataFrame pandas a partir dos dados "achatados".
df_assync = pd.DataFrame([flatten_item(item) for item in resultados_assync])


In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col

# Carregue a tabela existente Delta
delta_table = DeltaTable.forName(spark, "silver.tabnews.assunto")
df_assync_spark = spark.createDataFrame(df_assync)
# Realize o merge usando o Delta Lake
delta_table.alias("target") \
    .merge(
        df_assync_spark.alias("updates"),
        "target.id = updates.id"
    ) \
    .whenMatchedUpdate(set={
        "url": col("updates.url"),
        "body": col("updates.body")
    }) \
    .whenNotMatchedInsert(values={
        "id": col("updates.id"),
        "url": col("updates.url"),
        "body": col("updates.body")
    }) \
    .execute()