In [None]:
import sys
import os
validus_path = "/Users/ramnarayanchoudhary/Documents/GitHub/validusBoxes"
sys.path.append(validus_path)
original_cwd = os.getcwd()
os.chdir(validus_path)


from storage import STORAGE
import boxes.Frame.docling
import utils.unclassified
import storage
import importlib
stuffToReImport=[storage,boxes.Frame.docling,utils.unclassified]
# here 
for myModule in stuffToReImport:
        importlib.reload(myModule)
# from docling.document_converter import DocumentConverter
# from utils.unclassified import getArrayOfStructFromDF
from processor import processBox
# import pandas as pd
import os
import time

In [None]:
files=['075-013480.pdf','075-897564.pdf','077-010300.pdf','075-897674.pdf']

In [None]:
myStorageConfig={
    'defaultFileStorage':'onPrem',
}
client='manualBrokerage'

myStorage=STORAGE(client,myStorageConfig)


In [None]:
filesToSkip=[]
for file in files:
    

    if file in filesToSkip:
        continue
    print(f"Processing file: {file}")
    aSLABox={
        'extraConfig':{
            'fileName':file
        },
        'boxesPostProcess':[
            {
                "UBI":"BOX::Frame.docling.dividePDFIntoPages",
                "extraConfig":{},
                'boxesPostProcess':[
                    {
                        "UBI":"BOX::Frame.docling.DoclingExtractByPage",
                        "extraConfig":{}
                    }
                ]
            }
        ]
    }
    processBox("BOX::Frame.fileClassify.FileClassify",client,aSLABox,{},'now',myStorage)

    

In [None]:
#Pipeline: for every individual page → load its tables → normalise columns → clean rows → only then merge the already-clean pages.
from __future__ import annotations
import re, json, numpy as np, pandas as pd
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional
from storage import STORAGE

class PDFHoldingsProcessor:
    #  construction 
    def __init__(self, storage_config: Dict, client: str):
        self.storage = STORAGE(client, storage_config)

    #  generic helpers 
    @staticmethod
    def _strip(x):                        
        return x.strip() if isinstance(x, str) else x

    @staticmethod
    def _parse_numeric(v) -> Optional[float]:
        if v is None or (isinstance(v, float) and np.isnan(v)):
            return np.nan
        if isinstance(v, (int, float)):
            return float(v)

        txt, neg = str(v).strip(), False
        if txt.startswith('(') and txt.endswith(')'): neg, txt = True, txt[1:-1]
        if txt.endswith('-'):                        neg, txt = True, txt[:-1]
        txt = re.sub(r'[^0-9.\-]', '', txt)                 # keep digits / dot / minus
        if txt.count('.') > 1:                              # squash thousands-dots
            a, b = txt.split('.', 1); txt = a + '.' + b.replace('.', '')
        try:
            num = float(txt);  num = -num if neg else num
            return abs(num) if num < 0 and abs(num) < 1e12 else num
        except ValueError:
            return np.nan

    #  column mapping 
    _RULES = {
        'SHARES_UNITS'   : ['shares', 'units'],
        'DESCRIPTION'    : ['description', 'security', 'name', 'fund'],
        'BEGINNING_MARKET': ['beginning', 'start'],
        'ENDING_MARKET'  : ['ending', 'market', 'final'],
        'ADJUSTED_COST'  : ['adjusted', 'cost', 'basis'],
        'GAIN_LOSS'      : ['gain', 'loss', 'unrealized'],
    }
    _CORE = list(_RULES.keys())

    def _detect_schema(self, df: pd.DataFrame) -> Dict[str, str]:
        out = {}
        for col in df.columns:
            norm = str(col).lower().replace(' ','')
            for tgt, keys in self._RULES.items():
                if any(k in norm for k in keys):
                    out[col] = tgt; break
        return out

    def _standardise(self, df: pd.DataFrame) -> pd.DataFrame:
        mapping = self._detect_schema(df)
        res = pd.DataFrame(index=df.index)
        for tgt in self._CORE:
            src = next((c for c,t in mapping.items() if t==tgt), None)
            res[tgt] = df[src] if src else np.nan
        return res

    #  text helpers 
    def _page_text(self, pdf_id:str, page:str) -> List[str]:
        f = Path(self.storage.getDir('l1',[pdf_id,'doclingByPage',page,'docling_texts.txt']))
        return f.read_text(encoding='utf-8').splitlines() if f.exists() else []
    
    def _page_markdown(self, pdf_id:str, page:str) -> List[str]:
        f = Path(self.storage.getDir('l1',[pdf_id,'doclingByPage',page,'docling_markdown.txt']))
        return f.read_text(encoding='utf-8').splitlines() if f.exists() else []

    def _is_holdings(self, pdf_id:str, page:str) -> bool:
        lines = [l.strip().lower() for l in self._page_text(pdf_id,page)]
        return lines.count('asset detail') == 1          
    def _pages(self, pdf_id:str)->List[str]:
        base = Path(self.storage.getDir('l1',[pdf_id,'doclingByPage']))
        return sorted([p.name for p in base.iterdir() if p.is_dir() and p.name.isdigit()])
    #  table loading 
    def _load_tables(self, pdf_id:str, page:str)->List[pd.DataFrame]:
        d = Path(self.storage.getDir('l1',[pdf_id,'doclingByPage',page,'docling_tables']))
        dfs=[]
        for csv in sorted(d.glob('*.csv')):
            rel = Path(pdf_id)/'doclingByPage'/page/'docling_tables'/csv.stem
            df = self.storage.getFullTableAsDF('l1',str(rel))
            if not df.empty: dfs.append(df)
        return dfs
    #  cleaner 
    def _clean(self, df: pd.DataFrame) -> pd.DataFrame:
        if df.empty: return df
        c = df.applymap(self._strip).copy()

        for n in ['SHARES_UNITS','BEGINNING_MARKET','ENDING_MARKET','ADJUSTED_COST','GAIN_LOSS']:
            if n in c: c[n] = c[n].apply(self._parse_numeric)
        if 'SHARES_UNITS' in c: c['SHARES_UNITS'] = c['SHARES_UNITS'].abs()

        desc = c['DESCRIPTION'].fillna('')
        header_re = re.compile(
            r'^(TOTAL(?!\s+RETURN)|SUBTOTAL|NET\s+ASSETS|GAIN\s*/\s*LOSS|UNREALIZED|COLLECTIVE|PARTICIPANT|NOTES|CASH)$',
            re.I
        )
        good = desc.str.len().gt(2) & ~desc.str.match(header_re)
        nums = (c['SHARES_UNITS'].gt(0) | c['ENDING_MARKET'].gt(0))
        c = c[good & nums]

        def _ticker(t):
            for p in [r'TICKER[:\s]+([A-Z0-9.@]{2,12})',r'\b([A-Z]{2,6}X?)\b']:
                m=re.search(p,t.upper()); 
                if m and m.group(1) not in {'FUND','INDEX','PRICE','MONTH','END','TICKER'}: return m.group(1)
            return np.nan
        def _fname(t):
            t=re.sub(r'TICKER[:\s]+[A-Z0-9.@]{2,12}','',t,flags=re.I)
            t=re.sub(r'MONTH\s+END\s+PRICE\s*\d*\.?\d*','',t,flags=re.I)
            return re.sub(r'\s{2,}',' ',t).strip()

        c['TICKER']   = c['DESCRIPTION'].apply(_ticker)
        c['FUND_NAME']= c['DESCRIPTION'].apply(_fname)
        #print(c[['FUND_NAME','TICKER']+self._CORE])
        return c[['FUND_NAME','TICKER']+self._CORE]

    # metadata 
    def _metadata(self, text:List[str],txt2:List[str])->Dict:
        flat = ' '.join(text)
        dates=[]
        for m in re.finditer(r'\b(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\b',flat):
            ds=m.group(1); sep='/' if '/' in ds else '-'
            for f in [f'%m{sep}%d{sep}%Y',f'%m{sep}%d{sep}%y']:
                try: dates.append(datetime.strptime(ds,f).strftime('%Y-%m-%d')); break
                except: pass
        # below is the heavy pdf specific logic here 
        try:
            acc_i = next(i for i, l in enumerate(txt2) if 'ACCOUNT NUMBER' in l.upper())
        except StopIteration:
            return None
        start_i = 0
        for j in range(acc_i, -1, -1):
            u = txt2[j].upper()
            if 'BANK' in u or 'TRUST' in u or 'IMAGE' in u:
                start_i = j + 1
                break
        window = [l.strip() for l in txt2[start_i:acc_i + 1] if l.strip()]
        if not window:
            return None
        joined = ' '.join(window)
        m = re.search(r'^(.*?)(?=\s*ACCOUNT NUMBER)', joined, re.I)
        plan_name = m.group(1).strip() if m else joined.strip()
        print(plan_name)
        print("this is plan name")
        parts = [p.strip() for p in re.split(r'[;,]', plan_name)]

        print(parts)
        return {'as_of_date': dates[-1] if dates else None,
                'plan_name':parts[0],'counterparty_name': "TRUST BANK",'plan_number':parts[-1]}
    #  store 
    def _save(self,pdf_id:str,payload:Dict):
        out = Path(self.storage.getDir('l1',[pdf_id,'doclingByPage']))/'output.json'
        out.write_text(json.dumps(payload,indent=2,default=str))

    #  main API 
    def process_pdf(self, pdf_id:str):
        pages=self._pages(pdf_id); all_text=[]; cleaned=[]
        for p in pages:
            all_text+=self._page_text(pdf_id,p)
            txt2=self._page_markdown(pdf_id,p)
            # print("this is markdown txt how this look like", txt2)
            if not self._is_holdings(pdf_id,p): continue

            raw=pd.concat([self._standardise(t) for t in self._load_tables(pdf_id,p)],
                          ignore_index=True) if self._load_tables(pdf_id,p) else pd.DataFrame()
            cln=self._clean(raw); 

            print("now i want to print the cleaned dataframes")
            #print(cln)
            if not cln.empty: cleaned.append(cln)

        if not cleaned:
            print(f' No holdings extracted for {pdf_id}'); return None

        df=pd.concat(cleaned,ignore_index=True)

        print("now i want to see the entire dataframes")
        #print(df)
        data={'holdings':[{
            'security_name':r.FUND_NAME,
            'ticker': r.TICKER if pd.notna(r.TICKER) else None,
            'cusip': None,
            'units_held': float(r.SHARES_UNITS) if pd.notna(r.SHARES_UNITS) else None,
            'last_price': round(r.ENDING_MARKET/r.SHARES_UNITS,6) if (pd.notna(r.ENDING_MARKET) and pd.notna(r.SHARES_UNITS) and r.SHARES_UNITS) else None,
            'market_value': float(r.ENDING_MARKET) if pd.notna(r.ENDING_MARKET) else None,
            'currency':'USD','accrued_interest':None
        } for _,r in df.iterrows()], **self._metadata(all_text,txt2)}
        self._save(pdf_id,data)
        print(f'Processed {len(data["holdings"])} holdings for {pdf_id}')
        return data

    def process_all_pdfs(self):
        res={}; ids=self.storage.getAllLayerNFiles('l1','')
        for i in ids:
            try: res[i]=self.process_pdf(i)
            except Exception as e:
                print(f' Error {i}: {e}'); res[i]=None
        return res

#  run 
storage_config={'defaultFileStorage':'onPrem'}
proc=PDFHoldingsProcessor(storage_config,'manualBrokerage')
results=proc.process_all_pdfs()
results