# Scrape PDFs from the Portuguese Parliament
---
Initial experiments to scrape proposals and voting tables from official PDFs from the Portuguese Parliament.

## Setup

### Import libraries

In [None]:
import os
os.environ["PATH"] += ":/opt/homebrew/lib:/opt/homebrew/bin/gs"
os.environ["DYLD_LIBRARY_PATH"] = "/opt/homebrew/lib"   # this is needed for MacOS to find the Ghostscript library

In [None]:
from typing import Tuple, List
import math
import numpy as np
import pandas as pd
import camelot
from camelot.handlers import PDFHandler
from langchain.document_loaders import PyPDFLoader
from langchain.output_parsers import ListOutputParser
from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)
from langchain.chat_models import ChatOpenAI
from getpass import getpass
from tqdm.auto import tqdm

### Parameters

In [None]:
deputies_per_party = {
    "PS": 120,
    "PSD": 77,
    "CH": 12,
    "IL": 8,
    "PCP": 6,
    "BE": 5,
    "PAN": 1,
    "L": 1,
}
gov = "PS"

In [None]:
pdf_path = "/Users/adminuser/Downloads/XV_1_151_2023-07-07_ResultadoVotacoes_2023-07-07.pdf"

In [None]:
OPENAI_API_KEY = getpass()

## Scrape proposals' text

In [None]:
def is_relevant_proposal(text: str) -> bool:
    """Check if a proposal is relevant for our analysis.

    Args:
        text (str): The text of the proposal.
    
    Returns:
        bool: True if the proposal is relevant, False otherwise.
    """
    if (
        (
            all(party in text for party in deputies_per_party.keys()) 
            or "GOV" in text
        )
        and (
            any(word in text.lower() for word in ["favor", "contra", "abstenção"])
            or any(word in text for word in ["aprovad", "rejeitad"])
        )
    ):
        return True
    return False

In [None]:
def clean_proposal_text(text: str) -> str:
    """
    Process the dedescription of a proposal, removing unnecessary text
    and symbols.

    Args:
        text (str): The text of the proposal.
    
    Returns:
        str: The processed text of the proposal.
    """
    return text.split("Aprovad")[0].split("Rejeitad")[0].replace("\n", "").replace("  ", " ").replace(";", ".")

In [None]:
loader = PyPDFLoader(pdf_path)
pages = loader.load_and_split()
pages

In [None]:
pages[0].page_content

In [None]:
pages[2].page_content.split("\uf0de")[1:]

In [None]:
[text for text in pages[2].page_content.split("\uf0de")[1:] if is_relevant_proposal(text)]

In [None]:
[clean_proposal_text(text) for text in pages[2].page_content.split("\uf0de")[1:] if is_relevant_proposal(text)]

In [None]:
proposals = list()
unanimous_idx = list()
for page in pages:
    page_proposals = [text for text in page.page_content.split("\uf0de")[1:]] #if is_relevant_proposal(text)]
    for idx, prop in enumerate(page_proposals):
        if "unanimidade" in prop.lower():
            unanimous_idx.append(len(proposals) + idx)
    proposals.extend(page_proposals) #[clean_proposal_text(text) for text in page_proposals])

In [None]:
proposals

In [None]:
unanimous_idx

In [None]:
len(proposals)

In [None]:
len(proposals) - len(unanimous_idx)

## Scrape voting tables

In [None]:
def get_top_mid_coords(bbox: Tuple[float, float, float, float], offset: int = 10) -> Tuple[float, float]:
    """
    Get the coordinates of the middle of the top of a bounding box.

    Args:
        bbox (Tuple[float, float, float, float]): The bounding box.
        offset (int, optional): The offset to add to the y coordinate. Defaults to 10.

    Returns:
        Tuple[float, float]: The coordinates of the middle of the top of the bounding box.
    """
    return ((bbox[0] + bbox[2]) / 2, bbox[3] + offset)

def get_bottom_mid(bbox):
    """
    Get the coordinates of the middle of the bottom of a bounding box.

    Args:
        bbox (Tuple[float, float, float, float]): The bounding box.
    
    Returns:
        Tuple[float, float]: The coordinates of the middle of the bottom of the bounding box.
    """
    return ((bbox[0] + bbox[2]) / 2, bbox[1])

def calc_distance_between_coords(p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
    """
    Calculate the distance between two points.

    Args:
        p1 (Tuple[float, float]): The first point.
        p2 (Tuple[float, float]): The second point.

    Returns:
        float: The distance between the two points.
    """
    return math.sqrt((p1[0]-p2[0])**2 + (p1[1]-p2[1])**2)

def get_closest_text(table, htext_objs, initial_offset: int = 0, max_tries: int = 100):
    best_guess = None
    best_guess_length = 0
    offset = initial_offset
    num_tries = 0
    while (best_guess_length < 10) and (num_tries < max_tries):
        min_distance = np.inf
        table_mid = get_top_mid_coords(table._bbox, offset=offset)  # middle of the TOP of the table
        for obj in htext_objs:
            text_mid = get_bottom_mid(obj.bbox)  # middle of the BOTTOM of the text
            d = calc_distance_between_coords(text_mid, table_mid)
            if d < min_distance:
                best_guess = obj.get_text().strip()
                min_distance = d
                best_guess_length = len(best_guess)
        offset += 1
        num_tries += 1
    return best_guess

def get_tables_and_titles(pdf_filename):
    """Here's my hacky code for grabbing tables and guessing at their titles"""
    my_handler = PDFHandler(pdf_filename)  # from camelot.handlers import PDFHandler
    tables = camelot.read_pdf(pdf_filename, pages="all")
    titles = []
    with camelot.utils.TemporaryDirectory() as tempdir:
        for table in tqdm(tables, desc=f"Extracting {tables.n:d} tables"):
            my_handler._save_page(pdf_filename, table.page, tempdir)
            tmp_file_path = os.path.join(tempdir, f'page-{table.page}.pdf')
            layout, dim = camelot.utils.get_page_layout(tmp_file_path)
            htext_objs = camelot.utils.get_text_objects(layout, ltype="horizontal_text")
            titles.append(get_closest_text(table, htext_objs))  # Might be None

    return titles, tables

titles, tables = get_tables_and_titles(pdf_path)
for title, table in zip(titles, tables):
    print(title)
    display(table.df)
    print("---------------------------------------------------------\n")

## Associate a table for each proposal

In [None]:
table_idx_of_proposal = {idx: None for idx in range(len(proposals))}
tables_to_be_assigned = [idx for idx in range(len(tables)) if len(titles[idx]) > 0]
for proposal_idx, proposal_text in enumerate(proposals):
    if proposal_idx in unanimous_idx:
        table_idx_of_proposal[proposal_idx] = -1
        continue
    table_idx = tables_to_be_assigned[0]
    if len(titles[table_idx]) > 0 and titles[table_idx].lower().replace("\uf0de", "").replace(" ", "") in proposal_text.lower().replace("\uf0de", "").replace(" ", ""):
        table_idx_of_proposal[proposal_idx] = table_idx
        tables_to_be_assigned.remove(table_idx)
        continue
tables_to_be_assigned += [idx for idx in range(len(tables)) if len(titles[idx]) == 0]
print(
    f"Tables left to be assigned: {len(tables_to_be_assigned)}\n"
    f"Proposals left to be assigned: {len([idx for idx, table_idx in table_idx_of_proposal.items() if table_idx is None])}"
)

In [None]:
proposals_without_table = [
    proposals[idx]
    for idx in range(len(proposals))
    if table_idx_of_proposal[idx] is None
]
proposals_without_table

In [None]:
proposals_without_owner_party = [
    proposal
    for proposal in proposals
    if not (
        any(f"({party})" in proposal.replace(" ", "") for party in deputies_per_party.keys()) 
        or "(GOV)" in proposal.replace(" ", "")
    )
]
proposals_without_owner_party

In [None]:
for proposal in proposals_without_owner_party:
    idx = proposals.index(proposal)
    if idx == 0:
        continue
    if (table_idx_of_proposal[idx-1] is None) and (table_idx_of_proposal[idx] is not None):
        table_idx_of_proposal[idx-1] = table_idx_of_proposal[idx]
        proposals_without_table.remove(proposals[idx-1])
        proposals.remove(proposal)
        # shift all the indices of table_idx_of_proposal
        for i in range(idx, len(table_idx_of_proposal)-1):
            table_idx_of_proposal[i] = table_idx_of_proposal[i+1]
        table_idx_of_proposal.pop(list(table_idx_of_proposal.keys())[-1])
print(
    f"Tables left to be assigned: {len(tables_to_be_assigned)}\n"
    f"Proposals left to be assigned: {len(proposals_without_table)}"
)

In [None]:
if len(proposals_without_table) == len(tables_to_be_assigned):
    pairs_to_assign = list(zip(proposals_without_table, tables_to_be_assigned))
    for proposal, table_idx in pairs_to_assign:
        proposal_idx = proposals.index(proposal)
        # only assign this table if it occurs between two nearest proposals' tables
        previous_table_idx = max(
            [
                table_idx_of_proposal[idx]
                for idx in range(proposal_idx)
                if table_idx_of_proposal[idx] is not None
            ]
        )
        next_table_idx = min(
            [
                table_idx_of_proposal[idx]
                for idx in range(proposal_idx, len(proposals))
                if table_idx_of_proposal[idx] is not None
                and table_idx_of_proposal[idx] != -1
            ]
        )
        if table_idx > previous_table_idx and table_idx < next_table_idx:
            table_idx_of_proposal[proposal_idx] = table_idx
            proposals_without_table.remove(proposal)
            tables_to_be_assigned.remove(table_idx)
print(
    f"Tables left to be assigned: {len(tables_to_be_assigned)}\n"
    f"Proposals left to be assigned: {len(proposals_without_table)}"
)

In [None]:
table_idx_of_proposal

## Create a table for unanimous votes

In [None]:
tables[0].df

In [None]:
unanimous_df = pd.DataFrame(
    {
        "voto": ["FAVOR", "CONTRA", "ABSTENÇÃO"],
    } | {
        party: [num_deputies, 0, 0]
        for party, num_deputies in deputies_per_party.items()
    },
    index=[0, 1, 2],
)
unanimous_df.set_index("voto", inplace=True)
unanimous_df

## Convert tables into rows and merge them

In [None]:
def clean_voting_table(voting_df: pd.DataFrame) -> pd.DataFrame:
    """
    Cleans up the voting table, fixing the header and making data numeric.

    Args:
        voting_df (pd.DataFrame): The voting table to be cleaned up.

    Returns:
        pd.DataFrame: The cleaned up voting table.
    """
    # fix the header
    voting_df = voting_df.loc[1:, :]
    voting_df.columns = ["voto"] + list(unanimous_df.columns)
    # make data numeric
    for party, num_deputies in deputies_per_party.items():
        voting_df[party] = voting_df[party].replace("X", num_deputies).replace("", 0).astype(int)
    # set the index
    voting_df.set_index("voto", inplace=True)
    return voting_df

In [None]:
df = tables[1].df.copy()
df

In [None]:
df = clean_voting_table(df)
df

In [None]:
df.loc["CONTRA"].sum()

In [None]:
df.loc["CONTRA"].sum() / df.sum().sum()

In [None]:
df["PS"].idxmax()

In [None]:
" | ".join([party for party in deputies_per_party.keys() if f"({party})" in proposal.replace(" ", "")])

In [None]:
results = list()
party_decisions = {party: list() for party in deputies_per_party.keys()}
descriptions = list()
owners = list()
for proposal_idx, table_idx in table_idx_of_proposal.items():
    if table_idx == None:
        continue
    elif table_idx == -1:
        table_df = unanimous_df
    else:
        table_df = tables[table_idx].df
        table_df = clean_voting_table(table_df)
    results.append(
        "aprovada"
        if table_df.loc["CONTRA"].sum() / table_df.sum().sum() < 0.5
        else "rejeitada"
    )
    for party in party_decisions.keys():
        party_majority_vote = table_df[party].idxmax()
        if party_majority_vote == "FAVOR":
            party_decisions[party].append("favor")
        elif party_majority_vote == "CONTRA":
            party_decisions[party].append("contra")
        else:
            party_decisions[party].append("abstenção")
    descriptions.append(proposals[proposal_idx])
    owners.append(
        " | ".join(
            [
                party
                for party in deputies_per_party.keys()
                if f"({party})" in proposals[proposal_idx].replace(" ", "").replace("(GOV)", f"({gov})")
            ]
        )
    )
final_df = pd.DataFrame(
    {
        "resultado": results,
        **party_decisions,
        "descricao": descriptions,
        "proposta_por": owners,
    }
)
final_df.descricao = final_df.descricao.apply(clean_proposal_text)
final_df

## Further cleaning of proposals' text with LangChain

In [None]:
list(final_df.descricao.values)

In [None]:
class NewLineOutputParser(ListOutputParser):
    @property
    def lc_serializable(self) -> bool:
        return True

    def get_format_instructions(self) -> str:
        return (
            "Your response should be a list of new line separated values, "
            "eg: `foo\nbar\nbaz`"
        )

    def parse(self, text: str) -> List[str]:
        """Parse the output of an LLM call."""
        return text.strip().split("\n")

In [None]:
llm = ChatOpenAI(openai_api_key=OPENAI_API_KEY, model_name="gpt-3.5-turbo", temperature=0)

In [None]:
system_message_prompt = SystemMessagePromptTemplate.from_template(
    "You are a helpful Portuguese writer, with experience in editing articles in Portuguese and correcting typos."
)
human_message_prompt = HumanMessagePromptTemplate.from_template(
    "Corrige o texto seguinte, removendo e criando espaços para formar palavras em Português corretamente:\n{text}"
)
chat_prompt = ChatPromptTemplate.from_messages(
    [
        system_message_prompt,
        human_message_prompt,
    ]
)
output_parser = NewLineOutputParser()

In [None]:
descriptions = list(final_df.descricao.values)
outputs = list()
for i in tqdm(range(0, len(descriptions), 10)):
    input_prompt = chat_prompt.format_messages(text=descriptions[i : i + 10], format_instructions=output_parser.get_format_instructions())
    output = llm(input_prompt)
    output_parsed = output_parser.parse(output.content)
    outputs.extend(output_parsed)
outputs = [output for output in outputs if len(output) > 0]
outputs

In [None]:
len(outputs), len(descriptions)

In [None]:
print(f"Descriptions before chatbot:\n{final_df.descricao.values[:5]}\n")
final_df.descricao = outputs
print(f"Descriptions after chatbot:\n{final_df.descricao.values[:5]}")

In [None]:
final_df