In [14]:
import os
import re
import sqlite3
import uuid
from datetime import datetime, timedelta
from typing import Callable

import requests
from camelot import read_pdf
from dotenv import load_dotenv
from pandas import (
    Categorical,
    DataFrame,
    Series,
    concat,
    isna,
    read_csv,
    set_option,
    to_datetime,
    to_numeric,
)
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, PointStruct, VectorParams
from sentence_transformers import SentenceTransformer

load_dotenv()  # i don't use dotenv, but in case you didn't config vscode previously

set_option("display.max_columns", None)

# Extraindo Schema de dados do dicionario de dados

Para este caso em especifico eu não fiz nenhuma estrutura de processamento de texto por que o intuito era extrair a informação de forma rapida.

Poderia-se criar uma pipeline de extração de conteudo cru com uma lib como `pdfplumber` extraindo texto bruto e iterando sobre coletando nome, tipo e info das colunas + metadados opcionais, mas optei por agilidade na solução. Isso aqui teoricamente é pra ser feito uma vez só, então não gastei muitos recursos encima, como faria numa solução real

In [2]:
def df_to_markdown_table(df: DataFrame) -> str:
    """
    Convert a pandas DataFrame to a clean markdown table format
    """
    df = df.dropna(axis=1, how="all").fillna("")

    # Clean data by removing excessive whitespace
    for col in df.columns:
        if df[col].dtype == "object":
            df[col] = df[col].astype(str).str.strip()

    return tabulate(df, headers="keys", tablefmt="pipe", showindex=False)


def is_continuation_row(row: Series) -> bool:
    """
    Check if a row appears to be a continuation from a previous page.

    A continuation row is identified when:
    - Columns 0-2 are empty/null
    - Columns 3-5 contain data

    Args:
        row (pd.Series): A pandas Series representing a single row from a dataframe

    Returns:
        bool: True if the row appears to be a continuation row, False otherwise

    Note:
        Requires at least 5 columns in the row to perform the check.
        Returns False if the row has fewer than 5 columns.
    """
    if len(row) < 5:
        return False

    return all(  # Check if first 3 columns are empty
        isna(row.iloc[j]) or str(row.iloc[j]).strip() == "" for j in range(3)
    ) and any(  # Check if columns 3-5 have data
        not isna(row.iloc[j]) and str(row.iloc[j]).strip() != "" for j in range(2, 5)
    )


def merge_continuation_row(target_df: DataFrame, continuation_row: Series) -> None:
    """
    Merge a continuation row with the last row of the target dataframe.

    The function modifies the target dataframe in-place by:
    - Filling empty cells in the last row with data from the continuation row
    - Concatenating content when both cells contain data

    Args:
        target_df (pd.DataFrame): The dataframe whose last row will be updated
        continuation_row (pd.Series): The row containing continuation data

    Returns:
        None: Modifies target_df in-place

    Note:
        If target_df is empty, the function returns without making changes.
        Only processes columns that exist in both the target dataframe and continuation row.
    """
    if len(target_df) == 0:
        return

    last_row_idx = target_df.index[-1]

    for col_idx in range(len(continuation_row)):
        if col_idx >= len(target_df.columns):
            break

        col_name = target_df.columns[col_idx]
        continuation_value = continuation_row.iloc[col_idx]

        # Skip if continuation cell is empty
        if pd.isna(continuation_value) or str(continuation_value).strip() == "":
            continue

        existing_value = target_df.loc[last_row_idx, col_name]

        # If target cell is empty, use continuation value
        if pd.isna(existing_value) or str(existing_value).strip() == "":
            target_df.loc[last_row_idx, col_name] = continuation_value
        else:
            # Both have content, concatenate with space
            existing = str(existing_value).strip()
            new_content = str(continuation_value).strip()
            target_df.loc[last_row_idx, col_name] = f"{existing} {new_content}"


def process_single_dataframe(df: DataFrame, previous_df: DataFrame | None) -> DataFrame:
    """
    Process a single dataframe, checking for continuation rows and handling merging.

    If the first row of the dataframe is identified as a continuation row and there's
    a previous dataframe available, the continuation row is merged with the last row
    of the previous dataframe and removed from the current dataframe.

    Args:
        df (pd.DataFrame): The dataframe to process
        previous_df (Optional[pd.DataFrame]): The previous dataframe for potential merging.
                                            None if this is the first dataframe.

    Returns:
        pd.DataFrame: A copy of the processed dataframe with continuation rows removed
                     if they were merged with the previous dataframe

    Note:
        If df is empty, returns an empty copy.
        The previous_df is modified in-place if a merge occurs.
    """
    if len(df) == 0:
        return df.copy()

    current_df = df.copy()
    first_row = current_df.iloc[0]

    # Check if first row is a continuation row and we have a previous dataframe
    if is_continuation_row(first_row) and previous_df is not None:
        # Merge with previous dataframe
        merge_continuation_row(previous_df, first_row)
        # Remove the merged row from current dataframe
        current_df = current_df.iloc[1:].copy()

    return DataFrame(current_df)


def merge_split_rows(dfs: list[DataFrame]) -> list[DataFrame]:
    """
    Merge rows that are split across PDF pages in a list of dataframes.

    This function processes a list of dataframes (typically representing tables from
    different PDF pages) and merges rows that were split across page boundaries.
    Split rows are identified using the continuation row detection logic.

    Args:
        dfs (List[pd.DataFrame]): List of dataframes to process, typically one per PDF page

    Returns:
        List[pd.DataFrame]: List of processed dataframes with split rows merged.
                           Empty dataframes are excluded from the result.

    Note:
        The function processes dataframes sequentially, so continuation rows are only
        merged with the immediately preceding dataframe in the list.
        Original dataframes are not modified; copies are created for processing.

    Example:
        >>> dfs = [df_page1, df_page2, df_page3]
        >>> merged_dfs = merge_split_rows(dfs)
        >>> final_df = pd.concat(merged_dfs, ignore_index=True)
    """
    if not dfs:
        return []

    processed_dfs: list[DataFrame] = []

    for i, df in enumerate(dfs):
        processed_df = process_single_dataframe(
            df, processed_dfs[-1] if processed_dfs else None
        )
        if len(processed_df) > 0:
            processed_dfs.append(processed_df)

    return processed_dfs


def clean_markdown_text(text):
    """
    Clean and enhance the markdown text for better LLM processing
    """
    cleaned_lines = []

    for line in text.split("\n"):
        # Trim whitespace
        line = line.strip()

        # Skip completely empty lines except single newlines
        if line != "" or (cleaned_lines and cleaned_lines[-1] != ""):
            cleaned_lines.append(line)

    return "\n".join(cleaned_lines)

In [54]:
try:
    content = read_pdf(
        "../docs/dicionario-de-dados-2019-a-2025.pdf", pages="all", line_scale=40
    )
    for df in (dfs := [table.df for table in content][1:]):
        df.columns = dfs[0].iloc[0]  # Set the first row as column names

    dfs[0].drop(index=0, inplace=True)
    df_list = [
        "## Concatenated Table\n\n"
        + df_to_markdown_table(
            all_df := concat(merge_split_rows(dfs), ignore_index=True)
        )
        + "\n"
    ]
    all_df["Tipo"] = all_df["Tipo"].str.replace("\n", "")
    all_df.to_csv("../docs/data_dict.csv", index=False)
    full_markdown = "\n".join(df_list)
    print("Successfully extracted markdown tables from the PDF.")
    cleaned_markdown = clean_markdown_text(full_markdown)  # .split("\n")
    print("Markdown conversion completed successfully!")
except Exception as e:
    print(f"Error extracting tables: {e}")

Successfully extracted markdown tables from the PDF.
Markdown conversion completed successfully!


## Extraindo bases de dados e dividindo entre ambientes
- Esse processo aqui eu jogaria esses Data Lake pra ser acessivel ao time.
- Vamos fingir q o diretorio data é nosso Lake.
- Com um sqlite tambem simulando um postgres, se sobrar tempo eu troco por um Postgres num docker

In [12]:
def get_column_type_mapping(data_dict_df: DataFrame) -> dict[str, str]:
    """
    Enhanced type mapping with better pattern matching and validation.
    """
    type_mapping = {}

    for _, row in data_dict_df.iterrows():
        col_name = row["DBF"]
        col_type = row["Tipo"]

        if isna(col_name) or isna(col_type):  # type: ignore
            continue

        col_type_clean = str(col_type).strip().replace(" ", "")

        # Enhanced pattern matching
        if re.search(r"Date|Data", col_type_clean, re.IGNORECASE):
            type_mapping[col_name] = "datetime64[ns]"
        elif re.search(r"Number\(\d+\)", col_type_clean, re.IGNORECASE):
            type_mapping[col_name] = "float64"
        elif re.search(r"Varchar2?\(1\)", col_type_clean, re.IGNORECASE):
            # Single character fields are good candidates for categories
            type_mapping[col_name] = "category"
        elif re.search(r"Varchar2?\(\d+\)", col_type_clean, re.IGNORECASE):
            type_mapping[col_name] = "string"
        else:  # Default fallback for malformed or unrecognized types
            type_mapping[col_name] = "string"

    return type_mapping


def create_converters(dtype_mapping: dict[str, str]) -> dict[str, Callable]:
    """
    Create converter functions with proper scope handling.
    """
    converters = {}

    for col_name, dtype in dtype_mapping.items():
        if dtype == "datetime64[ns]":
            converters[col_name] = lambda x: to_datetime(
                x, format="%d/%m/%Y", errors="coerce"
            )
        elif dtype == "float64":
            converters[col_name] = lambda x: to_numeric(x, errors="coerce")
        elif dtype == "category":
            converters[col_name] = lambda x: Categorical(x)
        elif dtype == "string":
            converters[col_name] = lambda x: x.astype("string")

    return converters


def apply_dtype_conversions(df: DataFrame, dtype_mapping: dict[str, str]) -> DataFrame:
    """
    Apply data type conversions to DataFrame.
    """
    df_converted = df.copy()

    for col_name, target_dtype in dtype_mapping.items():
        if col_name not in df_converted.columns:
            continue

        try:
            if target_dtype == "datetime64[ns]":
                df_converted[col_name] = to_datetime(
                    df_converted[col_name], format="%d/%m/%Y", errors="coerce"
                )
            elif target_dtype == "float64":
                df_converted[col_name] = to_numeric(
                    df_converted[col_name], errors="coerce"
                )
            elif target_dtype == "category":
                df_converted[col_name] = df_converted[col_name].astype("category")
            elif target_dtype == "string":
                df_converted[col_name] = df_converted[col_name].astype("string")
        except Exception as e:
            print(
                f"Warning: Could not convert column {col_name} to {target_dtype}: {e}"
            )

    return df_converted


def optimize_for_target_format(df: DataFrame, target_format: str) -> DataFrame:
    """
    Apply format-specific optimizations.
    """
    df_optimized = df.copy()

    if target_format.lower() == "parquet":
        # Parquet handles categories efficiently
        for col in df_optimized.select_dtypes(include=["object"]).columns:
            if df_optimized[col].nunique() / len(df_optimized) < 0.5:  # < 50% unique
                df_optimized[col] = df_optimized[col].astype("category")

    elif target_format.lower() == "sqlite":
        # SQLite doesn't support pandas categories
        for col in df_optimized.select_dtypes(include=["category"]).columns:
            df_optimized[col] = df_optimized[col].astype("string")

    return df_optimized

In [66]:
dtype_mapping = get_column_type_mapping(all_df)

DFS: list[DataFrame] = []
for y in range(19, 25):
    print(f"Reading year 20{y} dataset")
    DFS.append(
        apply_dtype_conversions(
            read_csv(
                f"https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SRAG/20{y}/INFLUD{y}-26-06-2025.csv",
                sep=";",
                encoding="latin1",
                low_memory=False,
            ),
            dtype_mapping,
        )
    )

Reading year 2019 dataset
Reading year 2020 dataset
Reading year 2021 dataset
Reading year 2022 dataset
Reading year 2023 dataset
Reading year 2024 dataset


In [67]:
for df, year in zip(DFS, range(19, 25)):
    # Save parquet with optimized compression
    print(f"Saving year 20{year} dataset")
    df.to_csv(f"../data/raw/srag_20{year}.csv", index=False, date_format="%Y-%m-%d")
    optimize_for_target_format(df, "parquet").to_parquet(
        f"../data/raw/srag_20{year}.parquet", index=False, compression="snappy"
    )

Saving year 2024 dataset
Saving year 2024 dataset
Saving year 2024 dataset
Saving year 2024 dataset
Saving year 2024 dataset
Saving year 2024 dataset


In [None]:
print("Merging raw data into one database")
merged_df = concat(DFS, ignore_index=True)
merged_df.to_csv(
    "../data/interim/srag_2019_2024.csv", index=False, date_format="%Y-%m-%d"
)
merged_df.to_parquet(  # Não otimizando pq não limpei a base
    "../data/interim/srag_2019_2024.parquet", index=False, compression="snappy"
)
with sqlite3.connect("../data/interim/srag_2019_2024.db") as conn:
    optimize_for_target_format(
        merged_df, "sqlite"
    ).to_sql(  # Save to SQLite with proper type conversion
        "srag_cases",
        conn,
        if_exists="replace",
        index=False,
        dtype={
            col: "TIMESTAMP" if dtype == "datetime64[ns]" else None
            for col, dtype in merged_df.dtypes.items()
        },
    )
    conn.close()

## Usando NEWS API e salvando embeddings

In [10]:
class NewsAPICollector:
    def __init__(self):
        self.BASE_URL = "https://newsapi.org/v1/everything"
        self.API_KEY = os.getenv("NEWS_API_KEY")  # `apiKey` field

    def fetch_articles(
        self,
        query: str,
        language: str = "en",
        sort_by: str = "relevancy",  # Use 'relevancy' instead of 'popularity'
        page_size: int = 100,
        days_back: int = 29,
    ) -> list[dict[str, str]]:
        """
        Fetch articles from News API for a specific query
        """
        to_date = datetime.now()
        from_date = to_date - timedelta(days=min(days_back, 29))  # Ensure max 29 days

        try:
            response = requests.get(
                self.BASE_URL,
                params={
                    "q": query,
                    "language": language,
                    "sortBy": sort_by,  # 'relevancy', 'popularity', or 'publishedAt'
                    "pageSize": min(page_size, 100),  # Max 100 per request
                    "from": from_date.strftime("%Y-%m-%d"),
                    "to": to_date.strftime("%Y-%m-%d"),
                    "apiKey": self.API_KEY,
                },
            )
            response.raise_for_status()
            data = response.json()

            if data["status"] == "ok":
                return data["articles"]
            else:
                print(f"API Error: {data.get('message', 'Unknown error')}")
                return []

        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}")
            raise e

    def preprocess_articles(
        self, articles: list[dict[str, str]]
    ) -> list[dict[str, str | dict[str, str]]]:
        """
        Clean and preprocess articles for RAG system
        """
        return [
            {
                "id": f"news_{hash(article['url'])}",
                # Combine title, description, and content
                "text": f"{article.get('title', '')} {article.get('description', '')} {article.get('content', '')}".strip(),
                "title": article.get("title", ""),
                "url": article.get("url", ""),
                "published_at": article.get("publishedAt", ""),
                "source": article.get("source", {}).get("name", ""),
                "metadata": {
                    "source_type": "news_api",
                    "topic": "sars_cov",
                    "url": article.get("url", ""),
                    "published_date": article.get("publishedAt", ""),
                    "source_name": article.get("source", {}).get("name", ""),
                },
            }
            for article in articles
            if not article.get("content") or article["content"] == "[Removed]"
        ]

In [11]:
class QdrantRAGSystem:
    def __init__(
        self,
        qdrant_url: str = "http://localhost:6333",
        collection_name: str = "sars_cov_news",
        embedding_model: str = "all-MiniLM-L6-v2",
    ):
        self.client = QdrantClient(url=qdrant_url)
        self.collection_name = collection_name
        self.encoder = SentenceTransformer(embedding_model)
        self.vector_size = self.encoder.get_sentence_embedding_dimension()
        self._create_collection()  # Create collection if it doesn't exist

    def _create_collection(self):
        """Create Qdrant collection for storing embeddings"""
        try:
            # Check if collection exists
            if self.collection_name not in [
                col.name for col in self.client.get_collections().collections
            ]:
                self.client.create_collection(
                    collection_name=self.collection_name,
                    vectors_config=VectorParams(
                        size=self.vector_size, distance=Distance.COSINE
                    ),
                )
                print(f"Created collection: {self.collection_name}")
            else:
                print(f"Collection {self.collection_name} already exists")

        except Exception as e:
            print(f"Error creating collection: {e}")

    def add_documents(self, documents: list[dict[str, str | dict[str, str]]]):
        """Add documents to Qdrant with embeddings"""
        points = [
            PointStruct(
                id=str(uuid.uuid4()),
                # Generate embedding for the text
                vector=self.encoder.encode(doc["text"]).tolist(),
                payload={
                    "text": doc["text"],
                    "title": doc["title"],
                    "url": doc["url"],
                    "metadata": doc["metadata"],
                },
            )
            for doc in documents
        ]  # Create points for Qdrant
        try:  # Batch upload to Qdrant
            self.client.upsert(collection_name=self.collection_name, points=points)
            print(f"Successfully added {len(points)} documents to Qdrant")
        except Exception as e:
            print(f"Error adding documents to Qdrant: {e}")

    def search_similar(self, query: str, limit: int = 5) -> list[dict]:
        """Search for similar documents using vector similarity"""
        try:
            return [
                {
                    "score": result.score,
                    "text": result.payload["text"],
                    "title": result.payload["title"],
                    "url": result.payload["url"],
                    "metadata": result.payload["metadata"],
                }
                for result in self.client.search(
                    collection_name=self.collection_name,
                    query_vector=self.encoder.encode(query).tolist(),
                    limit=limit,
                )
            ]
        except Exception as e:
            print(f"Error searching documents: {e}")
            return []


In [12]:
class SARSCoVRAGPipeline:
    def __init__(self, qdrant_url: str = "http://localhost:6333"):
        self.news_collector = NewsAPICollector()
        self.rag_system = QdrantRAGSystem(qdrant_url=qdrant_url)

    def collect_and_index_news(self, query: str = "SARS-CoV-2 OR COVID-19"):
        """Complete pipeline to collect news and index in RAG system"""
        print("Fetching articles from News API...")
        if not (articles := self.news_collector.fetch_articles(query=query)):
            print("No articles found")
            return

        print(f"Found {len(articles)} articles. Preprocessing...")
        processed_docs = self.news_collector.preprocess_articles(articles)
        print(f"Processed {len(processed_docs)} articles. Adding to Qdrant...")
        self.rag_system.add_documents(processed_docs)
        print("Pipeline completed successfully!")

    def query_rag_system(self, user_query: str, num_results: int = 3):
        """Query the RAG system and return relevant information"""
        print(f"Searching for: {user_query}")
        if not (results := self.rag_system.search_similar(user_query, num_results)):
            print("No relevant articles found")
            return

        print(f"\nFound {len(results)} relevant articles:")
        print("=" * 50)

        for i, result in enumerate(results, 1):
            print(f"\n{i}. {result['title']}")
            print(f"   Relevance Score: {result['score']:.3f}")
            print(f"   Source: {result['metadata']['source_name']}")
            print(f"   URL: {result['url']}")
            print(f"   Preview: {result['text'][:200]}...")
            print("-" * 50)

In [13]:
pipeline = SARSCoVRAGPipeline()
pipeline.collect_and_index_news()

for query in [
    "What are the latest variants of SARS-CoV-2?",
    "COVID-19 vaccine effectiveness",
    "Long COVID symptoms and treatment",
    "SARS-CoV-2 transmission mechanisms",
]:
    print(f"\n{'=' * 60}")
    pipeline.query_rag_system(query, num_results=3)

Error creating collection: [WinError 10061] Nenhuma conexão pôde ser feita porque a máquina de destino as recusou ativamente
Fetching articles from News API...


NameError: name 'datetime' is not defined