# Cargamos las variables de entorno

In [1]:
from dotenv import  load_dotenv
from pandas.core.interchange.dataframe_protocol import DataFrame
from unstructured_client.models.shared import JobStatus

from infrastructure.config.app_settings import get_app_settings

load_dotenv()
app_settings=get_app_settings()

# Obtenemos los datos desde el bucket

In [2]:
import boto3
from mypy_boto3_s3.client import S3Client
import os
# prefix
prefix:str="cartas_fmv/"
s3_client:S3Client=boto3.client('s3', app_settings.aws_settings.region)

In [3]:
def get_files(bucket_name:str,prefix_path:str,document_type:str="pdf", position:int | None=None)->list[str]:
    paginator = s3_client.get_paginator('list_objects_v2')
    results:list[str]=[]
    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix_path):
        for obj in page.get("Contents", []):
            key=obj["Key"]
            if key.endswith("/"):
                continue
            if not key.lower().endswith(f".{document_type.lower()}"):
                continue
            results.append(key)
    if position is None:
        return results
    return [results[position]]

In [4]:
documents:list[str]=get_files(bucket_name=app_settings.s3_settings.bucket,prefix_path=prefix,document_type="pdf")
print("documents:",documents)

documents: ['cartas_fmv/107002101017693540_1.pdf', 'cartas_fmv/107125101001461297.pdf', 'cartas_fmv/107134101000916831_1.pdf', 'cartas_fmv/107147101000648625_1.pdf', 'cartas_fmv/107147101000838713_1.pdf', 'cartas_fmv/107147101000838713_2.pdf', 'cartas_fmv/107147101000841781_1.pdf', 'cartas_fmv/107147101000851803_1.pdf', 'cartas_fmv/107147101000851803_2.pdf', 'cartas_fmv/107147101000858331_1.pdf']


# Implementación de Textract

In [5]:
from mypy_boto3_textract.client import TextractClient

textract:TextractClient = boto3.client("textract", region_name=app_settings.aws_settings.region)

In [6]:
from mypy_boto3_textract.type_defs import StartDocumentAnalysisResponseTypeDef, BoundingBoxTypeDef


def start_analysis(file_key:str)->str | None:
    try:
        result:StartDocumentAnalysisResponseTypeDef=textract.start_document_analysis(
            DocumentLocation={
                "S3Object": {
                    "Bucket": app_settings.s3_settings.bucket,
                    "Name": file_key
                },

            },
            QueriesConfig={
                "Queries":[
                    {
                        "Text":"Who is the promotor?",
                        "Alias":"Promotor",
                        "Pages":[
                            "1"
                        ]
                    }
                ]
            },
            FeatureTypes=["TABLES","QUERIES"]
        )
        return result.get("JobId",None)
    except Exception as e:
        print(e)
        return None

In [7]:
from mypy_boto3_textract.literals import JobStatusType
from mypy_boto3_textract.type_defs import  GetDocumentAnalysisResponseTypeDef
import time

def get_analysis_result(job_id:str)->list[GetDocumentAnalysisResponseTypeDef]:
    time.sleep(1)
    response:GetDocumentAnalysisResponseTypeDef = textract.get_document_analysis(JobId=job_id)
    status: JobStatusType = response.get("JobStatus",None)
    print(f"Job status: {status}")
    while status=="IN_PROGRESS":
        time.sleep(5)
        response=textract.get_document_analysis(JobId=job_id)
        status: JobStatusType = response.get("JobStatus",None)
        print(f"Job status: {status}")
    all_responses=[response]
    while True:
        # En caso se tengan muchas paginas se realiza una paginación
        if "NextToken" not in all_responses[-1]:
            break
        all_responses.append(textract.get_document_analysis(JobId=job_id, NextToken=all_responses[-1]["NextToken"]))
    return all_responses

## Agrupar por páginas

In [8]:
from mypy_boto3_textract.type_defs import BlockTypeDef


def _group_by_page(
            response: list[GetDocumentAnalysisResponseTypeDef],
    ) -> tuple[list[BlockTypeDef], list[BlockTypeDef]]:
    """
    Agrupa los blocks por página y retorna (pages, blocks).
    """
    blocks: list[BlockTypeDef] = [b for r in response for b in r.get("Blocks", [])]
    pages: list[BlockTypeDef] = [p for p in blocks if p.get("BlockType") == "PAGE"]
    return pages, blocks

## Obtener metadata del promotor y fecha

### Obtener la metadata de la fecha encima de la palabra carta

In [9]:
def get_letter_block(results:list[BlockTypeDef])-> BlockTypeDef | None:
    # Encontramos las coordenadas de las páginas
    element: BlockTypeDef | None=next((e for e in results if e["BlockType"]=="LINE" and "carta n°" in e.get("Text","").lower()), None)
    return element


#### Evalúamos si se cumple condición del bounding box

In [46]:
def check_condition(top:float,left:float, width:float, r:BlockTypeDef)->bool:
    if r["BlockType"]=="PAGE" or r["BlockType"]=="QUERY":
        return False
    if r["BlockType"]!="LINE":
        return False
    bounding_box = r["Geometry"]["BoundingBox"]
    condition_1:bool=bounding_box["Top"]<top
    condition_2:bool=bounding_box["Left"]<left+width
    return condition_1 and condition_2

### Obtener todos los que esten por encima

In [39]:
def get_all_blocks_by_letter_position(letter_block:BlockTypeDef,results:list[BlockTypeDef])-> list[BlockTypeDef]:
    letter_bounding_box:BoundingBoxTypeDef | None= letter_block.get("Geometry",{}).get("BoundingBox",None)
    if letter_bounding_box is None:
        return []
    letter_bounding_box_top=letter_bounding_box["Top"]
    letter_bounding_box_left=letter_bounding_box["Left"]
    letter_bounding_box_width=letter_bounding_box["Width"]
    rx: list[BlockTypeDef]= [r for r in results if check_condition(letter_bounding_box_top,letter_bounding_box_left,letter_bounding_box_width,r) and r["BlockType"]!="PAGE"]
    return rx

### Obtener la fecha sabiendo la posición de la carta

In [55]:
def get_date_by_letter_position(letter_block:BlockTypeDef,results:list[BlockTypeDef])-> BlockTypeDef | None:
    letter_bounding_box:BoundingBoxTypeDef | None= letter_block.get("Geometry",{}).get("BoundingBox",None)
    if letter_bounding_box is None:
        return None
    letter_bounding_box_top=letter_bounding_box["Top"]
    letter_bounding_box_left=letter_bounding_box["Left"]
    letter_bounding_box_width=letter_bounding_box["Width"]
    all_blocks_candidate=get_all_blocks_by_letter_position(letter_block,results)
    #print(all_blocks_candidate[-1])
    rx: BlockTypeDef | None=next((r for r in reversed(all_blocks_candidate) if check_condition(letter_bounding_box_top,letter_bounding_box_left,letter_bounding_box_width,r) and r["BlockType"]!="PAGE") , None)
    return rx

#### Obtener el promoter desde la query

In [12]:
def get_promotor_by_query_result(results:list[BlockTypeDef])-> BlockTypeDef | None:
    promotor_block:BlockTypeDef | None=next((p for p in results if p["BlockType"]=="QUERY_RESULT"), None)
    return promotor_block


#### Obtener el texto del promotor y la fecha

In [13]:
def get_text_for_promotor_and_date(promotor_block: BlockTypeDef | None, start_date_block:BlockTypeDef | None)->tuple[str | None,str| None]:
    """
    Retorna una tupla de promotor y fecha
    :param promotor_block:
    :param start_date_block:
    :return:
    """
    promotor_text:str | None=None
    start_date_text:str | None=None
    if promotor_block is not None:
        promotor_text=promotor_block["Text"]
    if start_date_block is not None:
        start_date_text=start_date_block["Text"]

    return promotor_text,start_date_text


## Obtener la metadata de la tabla principal

In [14]:
from typing import Any, Dict, List, Tuple, Optional

def index_blocks_by_id(blocks: List[BlockTypeDef]) -> Dict[str, BlockTypeDef]:
    return {b["Id"]: b for b in blocks}

In [15]:
from typing import Any, Dict, List, Tuple, Optional


def get_children_ids(block: BlockTypeDef, types: Tuple[str, ...],block_by_id:Dict[str,BlockTypeDef]) -> List[str]:
    ids: List[str] = []
    for rel in block.get("Relationships", []) or []:
        if rel.get("Type") == "CHILD":
            ids.extend([rid for rid in rel.get("Ids", []) or []])
    # Filtra por tipos deseados
    return [rid for rid in ids if block_by_id.get(rid, {}).get("BlockType") in types]

In [16]:
from typing import Any, Dict, List, Tuple, Optional


def extract_cell_text(cell_block: Dict[str, Any],
                      block_by_id: Dict[str, BlockTypeDef],
                      checkbox_checked: str = "☑",
                      checkbox_unchecked: str = "☐") -> str:
    # Hijos WORD y SELECTION_ELEMENT
    text_parts: List[str] = []
    for rel in cell_block.get("Relationships", []) or []:
        if rel.get("Type") != "CHILD":
            continue
        for rid in rel.get("Ids", []) or []:
            child = block_by_id.get(rid)
            if not child:
                continue
            bt = child.get("BlockType")
            if bt == "WORD":
                txt = child.get("Text", "")
                if txt:
                    text_parts.append(txt)
            elif bt == "SELECTION_ELEMENT":
                state = child.get("SelectionStatus")
                text_parts.append(checkbox_checked if state == "SELECTED" else checkbox_unchecked)
    return " ".join(text_parts).strip()

In [17]:
from typing import Any, Dict, List, Tuple, Optional

def build_tables_from_textract(blocks: List[BlockTypeDef],
                               broadcast_spans: bool = False
                               ) -> List[Dict[str, Any]]:
    """
    Retorna una lista de tablas. Cada item:
    {
      "page": int,
      "table_block": <BLOCK TABLE>,
      "grid": List[List[str]]  # matriz de texto
    }
    """
    #global block_by_id
    block_by_id = index_blocks_by_id(blocks)

    # Agrupa TABLEs por página
    tables: List[BlockTypeDef] = [b for b in blocks if b.get("BlockType") == "TABLE"]
    result: List[Dict[str, Any]] = []

    for table in tables:
        page = table.get("Page", 1)
        # Recolecta CELLS hijos de la TABLE
        cell_ids:list[str] = get_children_ids(table, types=("CELL",), block_by_id=block_by_id)
        cell_blocks:list[BlockTypeDef] = [block_by_id[cid] for cid in cell_ids if cid in block_by_id]

        # Determina tamaño de la malla
        max_row = 0
        max_col = 0
        for c in cell_blocks:
            r = c.get("RowIndex", 1)
            cidx = c.get("ColumnIndex", 1)
            rsp = c.get("RowSpan", 1)
            csp = c.get("ColumnSpan", 1)
            max_row = max(max_row, r + rsp - 1)
            max_col = max(max_col, cidx + csp - 1)

        # Inicializa grid
        grid: List[List[Optional[str]]] = [[None for _ in range(max_col)] for _ in range(max_row)]

        # Coloca contenido
        for c in cell_blocks:
            r = c.get("RowIndex", 1) - 1
            cidx = c.get("ColumnIndex", 1) - 1
            rsp = c.get("RowSpan", 1)
            csp = c.get("ColumnSpan", 1)
            text = extract_cell_text(c, block_by_id)

            if rsp == 1 and csp == 1:
                # Simple
                grid[r][cidx] = text
            else:
                # Spans
                if broadcast_spans:
                    for rr in range(r, r + rsp):
                        for cc in range(cidx, cidx + csp):
                            grid[rr][cc] = text
                else:
                    # Solo en la esquina superior izquierda
                    grid[r][cidx] = text
                    # El resto queda como None para que sepas que era merged
                    # (puedes luego normalizar a "" si exportas a CSV)
                    pass

        # Normaliza None -> ""
        grid = [[cell if cell is not None else "" for cell in row] for row in grid]

        result.append({
            "page": page,
            "table_block": table,
            "grid": grid
        })

    return result


### Filtramos las tablas que contengan la palabra ADENDA ACTUAL

In [18]:
def contains_keyword_in_table(table_block, block_by_id) -> bool:
    for rel in table_block.get("Relationships", []):
        if rel.get("Type") == "CHILD":
            for cid in rel.get("Ids", []):
                cell = block_by_id.get(cid)
                if cell and cell.get("BlockType") == "CELL":
                    # Extrae texto de la celda
                    cell_text = extract_cell_text(cell, block_by_id)
                    #print("cell_text", cell_text)
                    if "ADENDA ACTUAL".lower() in cell_text.lower() or "DESEMBOLSADOS".lower() in cell_text.lower():
                        return True
    return False

### Operación de obtener la tabla de adenda

In [19]:
def get_table_with_adenda(tables,blocks:list[BlockTypeDef]):
    filtered = []
    block_by_id = index_blocks_by_id(blocks)
    for t in tables:
        if contains_keyword_in_table(t["table_block"], block_by_id):
            filtered.append(t)
    return filtered

### Reconstrucción en pandas

In [20]:
import pandas as pd
from IPython.display import display
def rebuild_in_pandas(tables, start=1):
    for i, t in enumerate(tables, start=start):
        df=pd.DataFrame(t["grid"])
        display(df.style.set_caption(f"Tabla {i} - Página {t['page']}"))

### Depuración del dataframe

In [81]:
import pandas as pd
from pandas import DataFrame

def clean_df(df: DataFrame) -> DataFrame:
    # 1) eliminar "N°" si existe (REASIGNANDO)
    df = df.drop(columns=["N°"], errors="ignore")

    # 2) columnas de interés
    keywords = ["monto", "importe"]
    cols = [c for c in df.columns if any(k in c.lower() for k in keywords)]
    sub = df.loc[:, cols].copy()

    # 3) normalizar strings por columna (iteración por posición, no por nombre)
    sub = sub.apply(lambda s: s.astype(str).str.strip() if s.dtype == "object" else s)

    # 4) quitar columnas duplicadas por CONTENIDO (deja solo una por cada serie idéntica)
    sub = sub.T.drop_duplicates().T

    return sub


### Transformación del dataframe

In [83]:
def transform_df(df:DataFrame) -> DataFrame:
    if df.shape[1]>3:
        raise ValueError(f"Se esperaban 3 columnas, pero se encontraron {df.shape[1]}: {list(df.columns)}")
    if df.shape[1]==3:
        df.columns=["disbursed_amount","reduced_amount","amount"]
    return df

### Limpieza de los elementos para convertirlos en números

In [87]:
import re
import pandas as pd

def _to_float_from_str(x: str) -> float | None:
    if pd.isna(x):
        return None
    s = str(x).strip()

    # quitar prefijos comunes: S/, S/., S, SI, etc.
    s = re.sub(r"^(S\/\.?|SI|S)\s*", "", s, flags=re.IGNORECASE)

    # detectar negativo por paréntesis
    neg = s.startswith("(") and s.endswith(")")
    if neg:
        s = s[1:-1]

    # quitar todo lo que no sea dígito, coma, punto o signo
    s = re.sub(r"[^0-9,.\-]", "", s)

    if not re.search(r"\d", s):
        return None

    # lógica separadores
    if "," in s and "." in s:
        if s.rfind(".") > s.rfind(","):  # 1,234.56 → '.' decimal
            s = s.replace(",", "")
        else:  # 1.234,56 → ',' decimal
            s = s.replace(".", "").replace(",", ".")
    elif "," in s:
        parts = s.split(",")
        if len(parts) == 2 and len(parts[1]) in (1, 2):
            s = s.replace(",", ".")  # decimal
        else:
            s = s.replace(",", "")   # miles
    # else: solo '.', Python ya lo entiende como decimal

    try:
        val = float(s)
        return -val if neg else val
    except ValueError:
        return None

def clean_currency_columns(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
    out = df.copy()
    for c in cols:
        if c in out.columns:
            out[c] = out[c].apply(_to_float_from_str)
    return out


### Helper de pandas para detectarlo como tabla

In [68]:
import pandas as pd
from typing import List


def df_from_grid_with_two_header_rows(
   table:dict[str, Any],    # usado en "concat"
) -> pd.DataFrame:
    grid: List[List[str]] = table.get("grid", [])
    assert len(grid) >= 2, "Se necesitan al menos 2 filas (header y datos)"
    # Segunda fila como encabezado
    header = list(grid[1])
    # Datos desde la tercera fila
    data_rows = grid[2:]

    # Normaliza ancho de cada fila al número de columnas
    n_cols = len(header)
    def normalize_len(row: List[str]) -> List[str]:
        row = list(row)
        if len(row) < n_cols:
            row += [""] * (n_cols - len(row))
        elif len(row) > n_cols:
            row = row[:n_cols]
        return row

    data_rows = [normalize_len(r) for r in data_rows]

    # Construye DataFrame
    df = pd.DataFrame(data_rows, columns=header)
    df=clean_df(df)
    return df


## Pipeline

In [88]:
from pandas import  DataFrame
def pipeline(file_key:str)->str | None:
    job_id:str | None=start_analysis(file_key)
    if job_id is None:
        return None
    results_from_analysis:list[GetDocumentAnalysisResponseTypeDef]=get_analysis_result(job_id=job_id)
    pages, blocks = _group_by_page(results_from_analysis)
    letter_block:BlockTypeDef=get_letter_block(blocks)
    date_block:BlockTypeDef | None=get_date_by_letter_position(letter_block, blocks)
    #print(date_block)
    promotor_block=get_promotor_by_query_result(blocks)
    #Obtenemos el valor del promotor y la fecha de la carta
    promotor_text, start_date_text = get_text_for_promotor_and_date(promotor_block, date_block)
    # Obtenemos todas las tablas existentes
    tables = build_tables_from_textract(blocks, broadcast_spans=False)
    # Filtramos todas las tablas que contengan en alguna celda la palabra ADENDA ACTUAL
    #print("tablas",tables)
    tables_filtered=get_table_with_adenda(tables, blocks)
    #rebuild_in_pandas(tables_filtered)
    df:DataFrame=df_from_grid_with_two_header_rows(tables_filtered[0] if len(tables_filtered) else {})
    df=transform_df(df)
    df=clean_currency_columns(df, ["monto desembolsado", "monto reducido","importe"])
    print("documento",file_key)
    print("promotor", promotor_text,start_date_text)
    display(df)

    #print("tables", tables)

    #rebuild_in_pandas(tables_filtered)




    return ""


## Obtenemos el nombre de archivo para comparar

In [112]:
documents:list[str]=get_files(bucket_name=app_settings.s3_settings.bucket,prefix_path=prefix,document_type="pdf", position=2)
print("documents:",documents)

documents: ['cartas_fmv/107134101000916831_1.pdf']


# Ejecución de la extracción insegura

In [89]:
documents:list[str]=get_files(bucket_name=app_settings.s3_settings.bucket,prefix_path=prefix,document_type="pdf",
                              position=None)
for document in documents:
    pipeline(document)

Job status: IN_PROGRESS


KeyboardInterrupt: 

# Ejecución de la extracción segura

In [90]:

documents:list[str]=get_files(bucket_name=app_settings.s3_settings.bucket,prefix_path=prefix,document_type="pdf",
                              position=None)
documents_unprocessed:list[str]=[]

for document in documents:
    try:
        pipeline(document)
    except Exception as e:
        print(e)
        documents_unprocessed.append(document)

print("documentos no procesados",documents_unprocessed)
tasa_exito=((len(documents)-len(documents_unprocessed))/(len(documents) if len(documents)>0 else 1))*100
print("tasa de exito",tasa_exito)

Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: SUCCEEDED
documento cartas_fmv/107002101017693540_1.pdf
promotor INMOBILIARIA MALYVET SAC San Isidro, 15 de mayo del 2023.


Unnamed: 0,monto desembolsado,monto reducido,importe
0,257910.0,70480.0,187430.0


Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: SUCCEEDED
documento cartas_fmv/107125101001461297.pdf
promotor INMOBILIUM GRUPO INMOBILIARIO San Isidro, 30 de Marzo del 2023.


Unnamed: 0,monto desembolsado,monto reducido,importe
0,1614400.0,109350.0,1505050.0


Job status: IN_PROGRESS
Job status: SUCCEEDED
Se necesitan al menos 2 filas (header y datos)
Job status: IN_PROGRESS
Job status: SUCCEEDED
documento cartas_fmv/107147101000648625_1.pdf
promotor ENITH SOFIA VIGO DIAZ San Isidro, 26 de Abril de 2023


Unnamed: 0,monto desembolsado,monto reducido,importe
0,9006409.5,3263567.5,5742842.0


Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: SUCCEEDED
documento cartas_fmv/107147101000838713_1.pdf
promotor LOS PORTALES S.A. San Isidro, 24 de Abril del 2023.


Unnamed: 0,monto desembolsado,monto reducido,importe
0,5779764.7,254246.7,5525518.0
1,5779764.7,254246.7,5525518.0


Job status: IN_PROGRESS
Job status: SUCCEEDED
documento cartas_fmv/107147101000838713_2.pdf
promotor LOS PORTALES S.A. San Isidro, 24 de Abril del 2023.


Unnamed: 0,monto desembolsado,monto reducido,importe
0,5525518.0,89100.0,5436418.0
1,5525518.0,89100.0,5436418.0


Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: SUCCEEDED
documento cartas_fmv/107147101000841781_1.pdf
promotor CRISTHIAN DONNY SALCEDO ROJAS San Isidro, 29 de Mayo del 2023


Unnamed: 0,monto desembolsado,monto reducido,importe
0,386980.0,168080.0,218900.0
1,386980.0,168080.0,218900.0


Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: SUCCEEDED
documento cartas_fmv/107147101000851803_1.pdf
promotor CONSTRUCTORA JARDINES DE JUANJUI SAC San Isidro, 11 de Mayo del 2023


Unnamed: 0,monto desembolsado,monto reducido,importe
0,1715685.21,560720.99,1154964.22


Job status: IN_PROGRESS
Job status: SUCCEEDED
documento cartas_fmv/107147101000851803_2.pdf
promotor CONSTRUCTORA JARDINES DE JUANJUI SAC San Isidro, 25 de Mayo del 2023


Unnamed: 0,monto desembolsado,monto reducido,importe
0,1154964.22,22416.19,1132548.03
1,1154964.22,22416.19,1132548.03


Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: SUCCEEDED
documento cartas_fmv/107147101000858331_1.pdf
promotor HOGARIAN S.A.C el Distrito de San Pedro de Lloc, Provincia de Pacasmayo y Departamento de La Libertad, conforme


Unnamed: 0,monto desembolsado,monto reducido,importe
0,5795025.0,68512.5,726512.5
1,893530.0,173250.0,720280.0
2,381150.0,311850.0,69300.0


documentos no procesados ['cartas_fmv/107134101000916831_1.pdf']
tasa de exito 90.0
