In [1]:
import sys
sys.path.append(r"d:\VSCode\re-assistant")

In [7]:
import os
import sys
import pytz
import redis
import asyncio
from datetime import datetime
from rich.console import Console
from rich.markdown import Markdown
from lib.utils import AGENT_MODEL, SYSTEM_PROMPT
from langchain.chat_models import init_chat_model
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, MessagesState, START, END
from lib.db.db_service import ThreadService
# from lib.db.db_conn import conn
from datetime import datetime
from lib.load_data import df
from rapidfuzz import fuzz
from lib.utils import match_value_in_columns, normalize_email_field, normalize_list
import re

Command-line environment detected. Using local data file.
Loading email metadata from: d:\VSCode\re-assistant\lib\data\all_mails.jsonl
Successfully loaded 11807 records for metadata.
Connecting to ChromaDB Vector Store...
Successfully connected to ChromaDB collection.


In [8]:
import nest_asyncio
import polars as pl
from langchain.tools import tool
nest_asyncio.apply()

In [None]:
temp_df = df.clone()

temp_df = temp_df.with_columns([
    pl.col("to").map_elements(normalize_list, return_dtype=str).alias("to_normalized")
])

temp_df = temp_df.with_columns([
    pl.col("cc").map_elements(normalize_list, return_dtype=str).alias("cc_normalized")
])

temp_df = temp_df.with_columns([
    pl.col("from").map_elements(normalize_list, return_dtype=str).alias("from_normalized")
])

print(temp_df['from_normalized'].to_list())

In [10]:
import polars as pl

def build_name_dict(df: pl.DataFrame) -> pl.DataFrame:
    """
    Vectorized, memory-efficient building of:
        token -> {"full": full_name, "emails": [list of emails]}

    Uses DataFrame.unpivot (replacement for deprecated melt), explode, and Polars string ops.
    """
    cols = [c for c in ["from_normalized", "to_normalized", "cc_normalized"] if c in df.columns]
    if not cols:
        raise ValueError("No normalized columns found. Expect one of: from_normalized, to_normalized, cc_normalized")

    # 1) unpivot (stack the normalized columns into a single column "addr")
    stacked = df.unpivot(index=[], on=cols, variable_name="src", value_name="addr")

    stacked = stacked.filter(pl.col("addr") != "")

    stacked = stacked.with_columns(
        pl.col("addr").str.split(",").alias("addr_list")
    )

    stacked = stacked.explode("addr_list")

    stacked = stacked.with_columns(
        pl.col("addr_list").str.strip_chars().alias("addr")
    ).drop("addr_list")

    stacked = stacked.filter(
        pl.col("addr").is_first_distinct().alias("unique_addr")
    )
    
    return stacked

In [11]:
stacked = build_name_dict(temp_df)
names_series = stacked['addr']

In [12]:
print(names_series)

shape: (2_646,)
Series: 'addr' [str]
[
	"harish.sales harish.sales@ahlaâ€¦
	"contact 2getherments infra pvtâ€¦
	"balakrishna info@2getherments.â€¦
	"malini satish kumar malini.satâ€¦
	"customer communications 2g cx â€¦
	â€¦
	"neeti1919@gmail.com"
	"nwajit@gmail.com"
	"arshkaur19@gmail.com"
	"anjalisinha373@gmail.com"
	"anindita92nayak@gmail.com"
]


In [13]:
import json

def parse_json(raw_response):
    if not raw_response:
        return None
    match = re.search(r'\{.*\}', raw_response, re.S)
    if match:
        return json.loads(match.group(0))
    return None

In [14]:
import tiktoken

enc = tiktoken.get_encoding("cl100k_base")

def count_tokens(text: str):
    return len(enc.encode(text))

In [18]:
import time
from typing import List, Tuple
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage

llm = ChatOpenAI(model='gpt-4.1-nano', temperature=0, max_tokens=300, api_key='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')

def run_batch_task(tasks: List[Tuple[int, List[HumanMessage], int]], tpm_limit: int = 180000) -> List[Tuple[int, str]]:
    """
    tasks: list of (task_id, messages, est_tokens)
    tpm_limit: max tokens/minute allowed
    returns: list of (task_id, response_text)
    """
    results: List[Tuple[int, str]] = []
    current_batch: List[Tuple[int, List[HumanMessage], int]] = []
    current_tokens = 0
    window_start = time.time()

    def flush(batch):
      """Send a batch to the LLM and record results."""
      print(f"\nðŸš€ Flushing {len(batch)} tasks "
              f"({sum(tok for _, _, tok in batch)} tokens)...")

      responses = llm.batch([msgs for _, msgs, _ in batch])
      for (task_id, _, _), resp in zip(batch, responses):
        print(f"   âœ… Task {task_id} completed.")
        results.append((task_id, resp.content))

    for task in tasks:
      _, _, tok = task

      if current_tokens + tok > tpm_limit and current_batch:
        flush(current_batch)
        current_batch, current_tokens = [], 0

        elapsed = time.time() - window_start
        if elapsed < 60:
          time.sleep(60 - elapsed)
        window_start = time.time()

      current_batch.append(task)
      current_tokens += tok
    
    if current_batch:
      flush(current_batch)

    return results

In [19]:
from typing import List, Tuple
from langchain.schema  import HumanMessage
from langchain.prompts import ChatPromptTemplate

prompt_template = ChatPromptTemplate.from_template(
"""
You are an information extraction system.  

Task:  
- Extract only meaningful PERSON NAME (not initials like 1-2 letters) tokens from the given string.
- Extract **every EMAIL address as a token without exception.**
- Emails must always be included, even if they contain numbers, company names, or other patterns.
- Split multi-part names into separate tokens (e.g., "Satish Kumar s" â†’ ["satish", "kumar"]).
- Ignore company suffixes or terms (e.g., "Ltd", "Inc", "Pvt", "Sales"), standalone numbers, and generic stop-words.
- Convert all tokens to lowercase.
- Return the result as a **strict JSON object only**, with no explanations, extra text, or formatting.
- Always return in the exact format below, with valid JSON only (no trailing commas, no comments).

Output format:
{{
  "tokens": ["", ""]
}}

Examples:  
Input: "customer cx customer.communications@2getherments.com"  
Output: {{ "tokens": ["customer", "customer.communications@2getherments.com"] }}

Input: "213 rahul sinha rahulsinha198@gmail.com"  
Output: {{ "tokens": ["rahul", "sinha", "rahulsinha198@gmail.com"] }}

Input: "pavan hs hspavankumar@yahoo.com"  
Output: {{ "tokens": ["pavan", "hspavankumar@yahoo.com"] }}

Now process this input:  
Full string: {full_name}
"""
)

tasks:List[Tuple[int, List[HumanMessage], int]] = []

for idx, full_name in enumerate(names_series):
  formatted_prompt = prompt_template.format(full_name=full_name)
  token_est = count_tokens(formatted_prompt)
  messages = [HumanMessage(content=formatted_prompt)]
  tasks.append((idx, messages, token_est))

In [23]:
# results = run_batch_task(tasks=tasks)

In [None]:
for task_id, resp in results:
    print(task_id, parse_json(resp))

In [22]:
import json
from pathlib import Path

out_path = Path("lib/data/name_token_map.jsonl")
with out_path.open("w", encoding='utf-8') as f:
    for task_id, resp in results:
        try:
            parser_json = parse_json(resp)
            f.write(json.dumps({
                "full_name": names_series[task_id],
                "tokens": parser_json['tokens']
            }) + "\n")
        except json.JSONDecodeError:
            print(f"Bad JSON for row {task_id}: {resp}")

In [61]:
import json
from collections import defaultdict

input_file  = Path("lib/data/name_token_map.jsonl")
output_file = Path("lib/data/token_map.jsonl")

# Step 1: Build token_map
token_map = defaultdict(set)

with open(input_file, "r", encoding="utf-8") as f:
    for line in f:
        entry = json.loads(line)
        full_name = entry["full_name"].strip()   # remove leading/trailing spaces
        tokens = entry["tokens"]

        for token in tokens:
            # Optional: normalize token if needed
            normalized_token = token.strip().lower()
            token_map[normalized_token].add(full_name)

# Step 2: Write out as JSONL (deduplicated automatically via set)
with open(output_file, "w", encoding="utf-8") as f:
    for token, names in token_map.items():
        # Sort for consistency (optional)
        unique_names = sorted(names)
        f.write(json.dumps({token: unique_names}, ensure_ascii=False) + "\n")

In [228]:
import json
from pathlib import Path
from collections import defaultdict

stacked = build_name_dict(temp_df)
names_series = stacked['addr']

token_map = defaultdict(set)
word_re = re.compile(r"[a-zA-Z]+(?:[-'][a-zA-Z]+)*")
for full in names_series:
    cleaned = re.sub(
        r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}",
        "",
        full
    )
    for token in word_re.findall(cleaned.lower()):
        token_map[token].add(full)

out_file = Path("lib/data/name_token_map.jsonl")
with out_file.open("w", encoding="utf-8") as f:
    for token, fulls in token_map.items():
        f.write(json.dumps({"token": token, "full_names": list(fulls)}) + "\n")

In [6]:
from collections import defaultdict
from typing import Dict, Set
import json

token_map: Dict[str, Set[str]] = defaultdict(set)

with open("lib/data/token_map.jsonl", "r", encoding='utf-8') as f:
    for line in f:
        entry = json.loads(line)
        for token, names in entry.items():
            token_map[token].update(names)

token_map = dict(token_map)

In [52]:
import re
from typing import Dict, Set
from difflib import SequenceMatcher

WORD_RE = re.compile(r"[a-zA-Z]+(?:[-'][a-zA-Z]+)*")

def normalize_token(s: str) -> str:
    """Normalize separators for comparison."""
    return re.sub(r"[-_]+", " ", s.lower()).strip()

def expand_query(query: str, token_map: Dict[str, Set[str]], fuzzy_threshold: float = 0.75) -> str:
    if not token_map:
        return query

    query_tokens = WORD_RE.findall(query)
    expanded_tokens = []

    for q_tok in query_tokens:
        q_norm = normalize_token(q_tok)
        best_full = None
        best_score = 0.0

        for token, full_names in token_map.items():
            token_norm = normalize_token(token)
            for full in full_names:
                # Compare normalized query token against token and full name
                sim_token = SequenceMatcher(None, q_norm, token_norm).ratio()
                sim_full = SequenceMatcher(None, q_norm, normalize_token(full)).ratio()
                sim = max(sim_token, sim_full)

                if sim > best_score:
                    best_score = sim
                    best_full = full

        if best_full and best_score >= fuzzy_threshold:
            expanded_tokens.append(best_full)
        else:
            expanded_tokens.append(q_tok)

    return " ".join(expanded_tokens)

In [54]:
print(expand_query(query="need details of sankar narayan", token_map=token_map))

need details msme chamber of commerce and industry of india msmeglobalccii@msmeccii.in sankar narayanan sankar.narayanan@2getherments.com gvvsl narayana gvvslnarayana@gmail.com


In [118]:
import re
from rapidfuzz import fuzz, process
from typing import Dict, Set, Optional, Tuple

_HAVE_RAPIDFUZZ = True

def _normalize(s: str) -> str:
    """Normalize a name/email for robust matching."""
    if not s:
        return ""
    # remove angle/round-bracketed extras and email localparts
    s = re.sub(r"<[^>]+>", " ", s)
    s = re.sub(r"\([^)]*\)", " ", s)
    s = re.sub(r"\S+@\S+", " ", s)
    # replace separators with spaces, strip non-alphanumerics
    s = re.sub(r"[-_.]+", " ", s.lower())
    s = re.sub(r"[^a-z0-9\s]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def get_best_match_from_token_map(
    sender: str,
    token_map: Dict[str, Set[str]],
    threshold: int = 75
) -> Optional[Tuple[str, float]]:
    """
    Return (best_full_name, score) if match >= threshold (0-100), else None.
    Uses rapidfuzz when present, otherwise difflib fallback.
    """
    if not sender or not token_map:
        return None

    sender_norm = _normalize(sender)

    # Fast path: exact token key match (case-insensitive)
    sender_lower = sender.lower().strip()
    if sender_lower in (k.lower() for k in token_map.keys()):
        # pick best full_name for that token by comparing normalized forms
        for k in token_map:
            if k.lower() == sender_lower:
                best_full = max(
                    token_map[k],
                    key=lambda f: _score(sender_norm, _normalize(f))
                )
                best_score = _score(sender_norm, _normalize(best_full))
                if best_score >= threshold:
                    return best_full, best_score
                return None

    # Build candidate list (normalized strings) -> meta mapping
    candidates = []
    meta = {}  # candidate_norm -> (token_key, full_name_or_None)
    for token_key, full_names in token_map.items():
        tnorm = _normalize(token_key)
        if tnorm:
            candidates.append(tnorm)
            # token candidate references no specific full name (None)
            meta[tnorm] = (token_key, None)
        for full in full_names:
            fnorm = _normalize(full)
            if fnorm:
                candidates.append(fnorm)
                meta[fnorm] = (token_key, full)

    # If rapidfuzz available, use extractOne with a strong scorer (WRatio)
    if _HAVE_RAPIDFUZZ:
        # remove duplicates while preserving meta mapping (last wins but that's okay)
        unique_choices = list(dict.fromkeys(candidates))
        match = process.extractOne(
            sender_norm,
            unique_choices,
            scorer=fuzz.WRatio,
            score_cutoff=threshold
        )
        if not match:
            return None
        match_str, score, _ = match  # match_str is normalized candidate
        token_key, full_name = meta.get(match_str, (None, None))
        # If the match candidate was just a token key (full_name is None),
        # pick the best full_name under that token_key
        if full_name is None and token_key is not None:
            best_full = max(
                token_map[token_key],
                key=lambda f: fuzz.WRatio(sender_norm, _normalize(f))
            )
            best_score = fuzz.WRatio(sender_norm, _normalize(best_full))
            return (best_full, float(best_score)) if best_score >= threshold else None
        return (full_name, float(score))

    # Fallback: iterate and use SequenceMatcher ratio
    best_full = None
    best_score = 0.0
    for token_key, full_names in token_map.items():
        token_norm = _normalize(token_key)
        # score against token key
        token_score = max(_seq_ratio(sender_norm, token_norm), 0.0)
        if token_score * 100 > best_score:
            # if token seems promising check its full names
            for full in full_names:
                full_norm = _normalize(full)
                s = _seq_ratio(sender_norm, full_norm) * 100
                if s > best_score:
                    best_score = s
                    best_full = full
        # also compare sender directly to each full_name
        for full in full_names:
            full_norm = _normalize(full)
            s = _seq_ratio(sender_norm, full_norm) * 100
            if s > best_score:
                best_score = s
                best_full = full

    if best_full and best_score >= threshold:
        return best_full, best_score
    return None

# helper scoring functions (used by fallback and for small composition)
def _seq_ratio(a: str, b: str) -> float:
    if not a or not b:
        return 0.0
    return SequenceMatcher(None, a, b).ratio()  # returned 0..1

def _score(a_norm: str, b_norm: str) -> float:
    """Return 0..100 score using rapidfuzz if present, else difflib*100."""
    if _HAVE_RAPIDFUZZ:
        return float(fuzz.WRatio(a_norm, b_norm))
    return _seq_ratio(a_norm, b_norm) * 100

In [123]:

def normalize_text(s: str) -> str:
    """Normalize a name/email for robust matching."""
    if not s:
        return ""
    # remove angle/round-bracketed extras and email localparts
    s = re.sub(r"<[^>]+>", " ", s)
    s = re.sub(r"\([^)]*\)", " ", s)
    s = re.sub(r"\S+@\S+", " ", s)
    # replace separators with spaces, strip non-alphanumerics
    s = re.sub(r"[-_.]+", " ", s.lower())
    s = re.sub(r"[^a-z0-9\s]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def score(a_norm: str, b_norm: str) -> float:
    """Return 0..100 score using rapidfuzz if present, else difflib*100."""
    return float(fuzz.WRatio(a_norm, b_norm))

def get_best_match_from_token_map(
    sender: str,
    token_map: Dict[str, Set[str]],
    threshold: int = 75
) -> Optional[Tuple[str, float]]:
    """
    Return (best_full_name, score) if match >= threshold (0-100), else None.
    Uses rapidfuzz when present, otherwise difflib fallback.
    """
    if not sender or not token_map:
        return None
    sender_norm = normalize_text(sender)

    sender_lower = sender.lower().strip()
    if sender_lower in (k.lower() for k in token_map.keys()):
        # pick best full_name for that token by comparing normalized forms
        for k in token_map:
            if k.lower() == sender_lower:
                best_full = max(
                    token_map[k],
                    key=lambda f: score(sender_norm, normalize_text(f))
                )
                best_score = score(sender_norm, normalize_text(best_full))
                if best_score >= threshold:
                    return best_full, best_score
                return None
    
    candidates = []
    meta = {}
    for token_key, full_names in token_map.items():
        tnorm = normalize_text(token_key)
        if tnorm:
            candidates.append(tnorm)
            # token candidate references no specific full name (None)
            meta[tnorm] = (token_key, None)
        for full in full_names:
            fnorm = normalize_text(full)
            if fnorm:
                candidates.append(fnorm)
                meta[fnorm] = (token_key, full)

    # remove duplicates while preserving meta mapping (last wins but that's okay)
    unique_choices = list(dict.fromkeys(candidates))
    match = process.extractOne(
        sender_norm,
        unique_choices,
        scorer=fuzz.WRatio,
        score_cutoff=threshold
    )
    if not match:
        return None
    match_str, score, _ = match  # match_str is normalized candidate
    token_key, full_name = meta.get(match_str, (None, None))
    # If the match candidate was just a token key (full_name is None),
    # pick the best full_name under that token_key
    if full_name is None and token_key is not None:
        best_full = max(
            token_map[token_key],
            key=lambda f: fuzz.WRatio(sender_norm, normalize_text(f))
        )
        best_score = fuzz.WRatio(sender_norm, normalize_text(best_full))
        return (best_full, float(best_score)) if best_score >= threshold else None
    return (full_name, float(score))

In [2]:
from rapidfuzz import fuzz, process
from typing import Dict, Set, Optional, Tuple

def normalize_text(s: str) -> str:
    """Normalize a name/email for robust matching."""
    if not s:
        return ""
    # replace separators with spaces, strip non-alphanumerics
    s = re.sub(r"[-_.]+", " ", s.lower())
    s = re.sub(r"[^a-z0-9\s]+", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def get_best_match_from_token_map(
    sender: str,
    token_map: Dict[str, Set[str]],
    threshold: int = 75
) -> Optional[Tuple[str, float]]:
    """
    Return (best_full_name, score) if match >= threshold (0-100), else None.
    Uses rapidfuzz when present, otherwise difflib fallback.
    """
    if not sender or not token_map:
        return None
    sender_norm = normalize_text(sender)

    sender_lower = sender.lower().strip()
    if sender_lower in (k.lower() for k in token_map.keys()):
        # pick best full_name for that token by comparing normalized forms
        for k in token_map:
            if k.lower() == sender_lower:
                best_full = max(
                    token_map[k],
                    key=lambda f: fuzz.WRatio(sender_norm, normalize_text(f))  # fixed
                )
                best_score = fuzz.WRatio(sender_norm, normalize_text(best_full))  # fixed
                if best_score >= threshold:
                    return best_full, best_score
                return None
    
    candidates = []
    meta = {}
    for token_key, full_names in token_map.items():
        tnorm = normalize_text(token_key)
        if tnorm:
            candidates.append(tnorm)
            # token candidate references no specific full name (None)
            meta[tnorm] = (token_key, None)
        for full in full_names:
            fnorm = normalize_text(full)
            if fnorm:
                candidates.append(fnorm)
                meta[fnorm] = (token_key, full)

    # remove duplicates while preserving meta mapping (last wins but that's okay)
    unique_choices = list(dict.fromkeys(candidates))
    match = process.extractOne(
        sender_norm,
        unique_choices,
        scorer=fuzz.WRatio,
        score_cutoff=threshold
    )
    if not match:
        return None
    match_str, score, _ = match  # match_str is normalized candidate
    token_key, full_name = meta.get(match_str, (None, None))
    # If the match candidate was just a token key (full_name is None),
    # pick the best full_name under that token_key
    if full_name is None and token_key is not None:
        best_full = max(
            token_map[token_key],
            key=lambda f: fuzz.WRatio(sender_norm, normalize_text(f))
        )
        best_score = fuzz.WRatio(sender_norm, normalize_text(best_full))
        return (best_full, float(best_score)) if best_score >= threshold else None
    return (full_name, float(score))

In [136]:
sender = 'kishore kalagotla'
if sender:
    sender = sender.lower()
    res = get_best_match_from_token_map(sender, token_map, threshold=75)
    if res:
        print(res)
        best_full_name, score = res
        sender = best_full_name

('venkata kishore kalagotla via team 2getherments team.2getherments@2getherments.com', 90.0)


In [139]:
temp_df = df.clone()

In [142]:
sender = "kishore kalagotla 2getherments"

mask = pl.lit(True)
temp_df = temp_df.with_columns([
    pl.col("from").map_elements(normalize_list, return_dtype=str).alias("from_normalized")
])

sender_mask = pl.col("from_normalized").map_elements(lambda x: match_value_in_columns(sender, x), return_dtype=bool)
mask = mask & sender_mask

temp_df = temp_df.filter(mask)

In [144]:
print(temp_df['from_normalized'][0])

kishore kalagotla  kishore.kalagotla@2getherments.com


In [132]:
value = "sankar.narayanan@2getherments.com"
column = "sankar narayanan sankar.narayanan@2getherments.com"

print(fuzz.partial_ratio(value.lower(), column.lower()) > 80)

True


In [25]:
from lib.utils import normalize_list, match_value_in_columns, get_best_match_from_token_map
from lib.load_data import df, token_map
from langchain.tools import tool
from datetime import datetime, timedelta
import polars as pl

In [34]:
import re

def preprocess_subject(subject: str) -> str:
    if not isinstance(subject, str):
        return ""
    subject = re.sub(r'^(re|fwd|fw):\s*', '', subject, flags=re.I)  # remove reply/forward
    return subject.lower().strip()

In [36]:
print(preprocess_subject("Re: Refund Statement - 805"))

refund statement - 805


In [37]:
def extract_numbers(text: str) -> set[str]:
    return set(re.findall(r'\b\d+\b', text))

In [42]:
print(extract_numbers("refund statement - 805"))

{'805'}


In [178]:
import re
from rapidfuzz import fuzz

def preprocess_subject(subject: str) -> str:
    if not isinstance(subject, str):
        return ""
    # Lowercase and replace symbols with space
    subject = re.sub(r'[:\-_,]', ' ', subject)
    subject = re.sub(r'\s+', ' ', subject)  # normalize spaces
    return subject.lower().strip()

def extract_numbers(text: str) -> set[str]:
    return set(re.findall(r'\b\d+\b', text))

def smart_subject_match(user_value: str, column_value: str) -> bool:
    if not column_value:
        return False
    
    user_clean = preprocess_subject(user_value)
    col_clean = preprocess_subject(column_value)

    user_nums = extract_numbers(user_clean)
    col_nums = extract_numbers(col_clean)

    # --- Number must match if present ---
    if user_nums and not (user_nums & col_nums):
        return False

    # --- Fuzzy match on remaining text ---
    fuzz_score = fuzz.token_set_ratio(user_clean, col_clean) / 100

    if user_nums:
        # numbers match â†’ relax threshold
        return fuzz_score >= 0.65
    else:
        # no numbers â†’ require stricter match
        return fuzz_score >= 0.85

In [186]:
mask = pl.lit(True)
subject = "Refund Statement 805"
# "Refund for Flat no: 805"
# "Re: Refund Statement - 805"
temp_df = df.clone()

if subject:    
    subject_mask = pl.col("subject").map_elements(lambda x: smart_subject_match(subject, x), return_dtype=bool)
    mask = mask & subject_mask

temp_df = temp_df.filter(mask)

total_matches = temp_df.height
preview_cols = ["id", "threadId", "from", "to", "subject", "date", "cc", "snippet", "labels", "attachments"]

results_preview = temp_df.head(5).select(preview_cols).to_dicts()

In [187]:
for result in results_preview:
    print(result)
    print("----------------")

{'id': '198f083a5ce11830', 'threadId': '198f02e745824fb8', 'from': 'Sankar Narayanan <sankar.narayanan@2getherments.com>', 'to': ['nirav.j05@gmail.com'], 'subject': 'Re: Refund Statement - 805', 'date': '2025-08-28T17:20:03+05:30Z', 'cc': ['hari@2getherments.com', 'pallavi@2getherments.com', 'meena@2getherments.com', 'customer.communications@2getherments.com', 'ramakrishna@2getherments.com', 'shachiraju@2getherments.com'], 'snippet': '@Rama Krishna Please proceed further. On Thu, Aug 28, 2025 at 5:05 PM Nirav Joshi &lt;nirav.j05@gmail.com&gt; wrote: Thank you Sankar sir for your swift response. Please proceed with the same. On Thu,', 'labels': ['UNREAD', 'IMPORTANT', 'CATEGORY_PERSONAL', 'INBOX'], 'attachments': []}
----------------
{'id': '199098a3358031c1', 'threadId': '198f02e745824fb8', 'from': 'Nirav Joshi <nirav.j05@gmail.com>', 'to': ['sankar.narayanan@2getherments.com'], 'subject': 'Re: Refund Statement - 805', 'date': '2025-09-02T13:57:42+05:30Z', 'cc': ['hari@2getherments.com

In [149]:
from lib.utils import normalize_list, match_value_in_columns, smart_subject_match
from lib.load_data import df
from langchain.tools import tool
from datetime import datetime, timedelta
import polars as pl
from typing import Union

# @tool("email_filtering_tool", parse_docstring=True)
def email_filtering_tool(
    uid: str = None,
    threadId: str = None,
    sender: str = None,
    recipient: str = None,
    subject: str = None,
    cc: bool = False,
    labels: list[str] = None,
    start_date: str = None,
    end_date: str = None,
    body: bool = False,
    html: bool = False,
    sort_by: str = "date",
    sort_order: str = "desc",
    limit: int = 5,
) -> str:
    """
    This tool filter emails based on metadata such as sender (human), recipient (human), date range, or thread ID.
    
    Args:
        uid (str, optional): Filter emails by their unique UID. Exact match required.
        threadId: Filter emails by their conversation (email chian) thread ID, Returns all messages belonging to that specific chain (thread).
        sender (str or list of str, optional): Filter emails by sender(s). Can be full email address, partial email, or sender names (case-insensitive, only humans).
        recipient (str or list of str, optional): Filter emails by recipient(s). Can be full email addresses, partial emails, or recipient names, but strictly not numbers. (case-insensitive, only humans).
        subject (str, optional): Filter email by subject text. Can be full or partial subject string (case-insensitive).
        cc (bool, optional): Filter cc recepients of the email only when explicitly requested. Default False.
        labels (list of str, optional): Filter emails by one or more labels. Matches any email that contains at least one of the provided labels (case-insensitive).
        start_date (str, optional): Filter emails sent on or after this date. Format: 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.
        end_date (str, optional): Filter emails sent on or before this date. Format: 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.
        body (bool, optional): Include the plain-text email body only when explicitly requested. Default False.
        html (bool, optional): Include the full HTML body only when explicitly requested. Default False.
        sort_by (str, optional): Column to sort the results by. Default is 'date'.
        sort_order (str, optional): Sort order: 'asc' for ascending, 'desc' for descending. Default is 'desc'.
        limit (int, optional): Maximum number of results to return. Default is 5.
    """

    print(f"email_filtering_tool is being called {uid}, {threadId}, {sender}, {recipient}, {subject}, {cc}, {labels}, {start_date}, {end_date}, {body}, {html}, {sort_by}, {sort_order}, {limit}")
    temp_df = df.clone()
    mask = pl.lit(True)

    if uid:
        mask = mask & (pl.col("id") == uid)

    if threadId:
        mask = mask & (pl.col("threadId") == threadId)

    # --- Sender filter (case-insensitive, matches name or email) ---
    if sender:
        sender = sender.lower()
        # Add a normalized column
        temp_df = temp_df.with_columns([
            pl.col("from").map_elements(normalize_list, return_dtype=str).alias("from_normalized")
        ])
        # Filter rows where the normalized 'from' matches sender
        sender_mask = pl.col("from_normalized").map_elements(lambda x: match_value_in_columns(sender, x), return_dtype=bool)
        mask = mask & sender_mask

    # --- Recipient filter ---
    if recipient:
        recipient = recipient.lower()
        # Normalize 'to' and 'cc' columns which are lists
        temp_df = temp_df.with_columns([
            pl.col("to").map_elements(normalize_list, return_dtype=str).alias("to_normalized")
        ])
        # Filter rows where any normalized 'to' or 'cc' matches the recipient
        recipient_mask = (
            pl.col("to_normalized").map_elements(lambda x: match_value_in_columns(recipient, x), return_dtype=bool)
        )
        if cc:
            # Normalize 'to' and 'cc' columns which are lists
            temp_df = temp_df.with_columns([
                pl.col("cc").map_elements(normalize_list, return_dtype=str).alias("cc_normalized")
            ])
            # Filter rows where any normalized 'to' or 'cc' matches the recipient
            cc_mask = (
                pl.col("cc_normalized").map_elements(lambda x: match_value_in_columns(recipient, x), return_dtype=bool)
            )
            recipient_mask = recipient_mask | cc_mask

        mask = mask & recipient_mask

    # --- Date filtering (normalize to datetime) ---
    if start_date or end_date:
        temp_df = temp_df.with_columns([
            pl.col("date").str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%SZ", strict=False).alias("date_dt")
        ])
        
    if start_date:
        try:
            start_date_dt = datetime.strptime(start_date, "%Y-%m-%d")
            mask = mask & (pl.col("date_dt") >= start_date_dt)
        except Exception as e:
            return f"Error parsing start_date: {e}"

    if end_date:
        try:
            end_date_dt = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) - timedelta(seconds=1)
            mask = mask & (pl.col("date_dt") <= end_date_dt)
        except Exception as e:
            return f"Error parsing end_date: {e}"
        
    if labels: 
        labels = [lbl.strip().lower() for lbl in labels]

        temp_df = temp_df.with_columns([
            pl.col("lables").map_elements(normalize_list, return_dtype=str).alias("labels_normalized")
        ])

        labels_mask = pl.col("labels_normalized").map_elements(
            lambda email_lables: any(lbl in email_lables for lbl in labels),
            return_dtype=bool
        )

        mask = mask & labels_mask

    if subject:    
        subject_mask = pl.col("subject").map_elements(lambda x: smart_subject_match(subject, x), return_dtype=bool)
        mask = mask & subject_mask

    # Apply the mask only once
    temp_df = temp_df.filter(mask)

    # --- Sorting ---
    temp_df = temp_df.sort(
        by=sort_by,
        descending=(sort_order.lower() == "desc")
    )

    # --- Handle empty result ---
    if temp_df.is_empty():
        return "No emails found matching the specified criteria."

    # --- Preview results ---
    total_matches = temp_df.height
    preview_cols = ["id", "threadId", "from", "to", "subject", "date", "cc", "snippet", "labels", "attachments"]
    if body:
        preview_cols.append("body")
    if html:
        preview_cols.append("html")

    if limit is None:
        results_preview = temp_df.select(preview_cols).to_dicts()
    else:
        results_preview = temp_df.head(limit).select(preview_cols).to_dicts()

    def fmt(res):
        parts = [
            f"id: {res.get('id','N/A')}",
            f"ThreadId: {res.get('threadId','N/A')}",
            f"From: {res.get('from','N/A')}",
            f"To: {res.get('to','N/A')}",
            f"CC: {res.get('cc','N/A')}",
            f"Subject: {res.get('subject','N/A')}",
            f"Date: {res.get('date','N/A')}",
            f"Snippet: {res.get('snippet','N/A')}",
            f"Labels: {res.get('labels','N/A')}",
            f"Attachments: {res.get('attachments','N/A')}",
        ]
        if body:
            parts.append(f"Body:\n{res.get('body','N/A')}")
        if html:
            parts.append(f"HTML:\n{res.get('html','N/A')}")
        return "\n".join(parts)

    formatted_results = "\n\n---\n\n".join(fmt(r) for r in results_preview)
    shown = total_matches if limit is None else min(int(limit), total_matches)
    return f"Found {total_matches} emails matching the criteria. Showing {shown}:\n\n{formatted_results}"

'Re: Refund Statement - 805'

In [None]:
import time
import tiktoken
import polars as pl
from typing import List, Tuple
from lib.load_data import df
from langchain.tools import tool
from datetime import datetime, timedelta
from langchain_openai import ChatOpenAI
from datetime import datetime, timedelta, timezone
from langchain.prompts import ChatPromptTemplate
from langchain_core.messages import HumanMessage
from lib.utils import normalize_list, match_value_in_columns, smart_subject_match

template = """
You are an expert email summarizer.  

Task:  
- Input: Multiple emails with metadata (id, threadId, from, to, cc, Subject, date, snippet, body, labels, attachments).  
- Group by ThreadId and summarize chronologically.  
- Capture key points, actions, and important details with clarity and brevity.

Summarize this,
{chunk}
"""
prompt_perspective = ChatPromptTemplate.from_template(template)

llm = ChatOpenAI(
    model='gpt-4o-mini',
    temperature=0,
    max_completion_tokens=512
)

encoding_model = tiktoken.get_encoding("cl100k_base")
def get_chunks(text: str, chunk_size: int = 10000) -> List[str]:
    """Split a large text into token-based chunks."""
    tokens = encoding_model.encode(text)

    chunks = []
    for i in range(0, len(tokens), chunk_size):
        chunk_tokens = tokens[i:i+chunk_size]
        chunk_text = encoding_model.decode(chunk_tokens)
        chunks.append(chunk_text)
    return chunks

def count_tokens(text: str) -> int:
    return len(encoding_model.enc(text))

def run_batch_task(tasks: List[Tuple[int, List[HumanMessage], int]], tpm_limit: int = 29000) -> List[Tuple[int, str]]:
    """
    tasks: list of (task_id, messages, est_tokens)
    tpm_limit: max tokens/minute allowed
    returns: list of (task_id, response_text)
    """
    results: List[Tuple[int, str]] = []
    current_batch: List[Tuple[int, List[HumanMessage], int]] = []
    current_tokens = 0
    window_start = time.time()

    def flush(batch):
        """Send a batch to the LLM and record results."""
        nonlocal results
        if not batch:
            return
        responses = llm.batch([msgs for _, msgs, _ in batch])
        for (task_id, _, _), resp in zip(batch, responses):
            results.append((task_id, resp.content))

    for task in tasks:
        _, _, tok = task

        if current_tokens + tok > tpm_limit and current_batch:
            flush(current_batch)
            current_batch, current_tokens = [], 0

            # respect TPM limit
            elapsed = time.time() - window_start
            if elapsed < 60:
                time.sleep(60 - elapsed)
            window_start = time.time()

        current_batch.append(task)
        current_tokens += tok

    if current_batch:
        flush(current_batch)

    return results

def parse_datetime_utc(date_str: str) -> datetime:
    """
    Parse input date string and return a UTC-aware datetime object.
    Accepts 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.
    """
    if len(date_str) == 10:  # date-only
        dt = datetime.strptime(date_str, "%Y-%m-%d")
    else:  # full datetime
        dt = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
    # Make it UTC-aware
    return dt.replace(tzinfo=timezone.utc)

def human_readable_date(timestamp) -> str:
    """
    Convert a timestamp to human-readable form.
    Accepts: str, datetime.datetime, or None
    """
    if timestamp is None:
        return "N/A"
    
    # If Polars datetime, convert to Python datetime
    if not isinstance(timestamp, datetime):
        try:
            # Try parsing string
            timestamp = datetime.fromisoformat(str(timestamp))
        except Exception:
            return "N/A"
    
    return timestamp.strftime("%a, %b %d, %Y %I:%M %p")

# @tool("email_filtering_tool", parse_docstring=True)
def email_filtering_tool(
    uid: str = None,
    threadId: str = None,
    sender: str = None,
    recipient: str = None,
    subject: str = None,
    cc: bool = False,
    labels: list[str] = None,
    start_date: str = None,
    end_date: str = None,
    body: bool = False,
    html: bool = False,
    sort_by: str = "date",
    sort_order: str = "desc",
    limit: int = None,
) -> str:
    """
    This tool filter emails based on metadata such as sender (human), recipient (human), date range, or thread ID.
    
    Args:
        uid (str, optional): Filter emails by their unique UID. Exact match required.
        threadId: Filter emails by their conversation (email chian) thread ID, Returns all messages belonging to that specific chain (thread).
        sender (str or list of str, optional): Filter emails by sender(s). Can be full email address, partial email, or sender names (case-insensitive, only humans).
        recipient (str or list of str, optional): Filter emails by recipient(s). Can be full email addresses, partial emails, or recipient names, but strictly not numbers. (case-insensitive, only humans).
        subject (str, optional): Filter email by subject text. Can be full or partial subject string (case-insensitive).
        cc (bool, optional): Filter cc recepients of the email only when explicitly requested. Default False.
        labels (list of str, optional): Filter emails by one or more labels. Matches any email that contains at least one of the provided labels (case-insensitive).
        start_date (str, optional): Filter emails sent on or after this date. Format: 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.
        end_date (str, optional): Filter emails sent on or before this date. Format: 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.
        body (bool, optional): Include the plain-text email body only when explicitly requested. Default False.
        html (bool, optional): Include the full HTML body only when explicitly requested. Default False.
        sort_by (str, optional): Column to sort the results by. Default is 'date_dt'.
        sort_order (str, optional): Sort order: 'asc' for ascending, 'desc' for descending. Default is 'desc'.
        limit (int, optional): Maximum number of results to return. set default value to 5.
    """

    print(f"email_filtering_tool is being called {uid}, {threadId}, {sender}, {recipient}, {subject}, {cc}, {labels}, {start_date}, {end_date}, {body}, {html}, {sort_by}, {sort_order}, {limit}")
    temp_df = df.clone()
    mask = pl.lit(True)

    temp_df = temp_df.with_columns([
        temp_df["body"].struct.field("text").alias("body_text"),
        temp_df["body"].struct.field("html").alias("body_html"),
    ])

    if uid:
        mask = mask & (pl.col("id") == uid)

    if threadId:
        mask = mask & (pl.col("threadId") == threadId)

    # --- Sender filter (case-insensitive, matches name or email) ---
    if sender:
        sender = sender.lower()
        # Add a normalized column
        temp_df = temp_df.with_columns([
            pl.col("from").map_elements(normalize_list, return_dtype=str).alias("from_normalized")
        ])
        # Filter rows where the normalized 'from' matches sender
        sender_mask = pl.col("from_normalized").map_elements(lambda x: match_value_in_columns(sender, x), return_dtype=bool)
        mask = mask & sender_mask

    # --- Recipient filter ---
    if recipient:
        recipient = recipient.lower()
        # Normalize 'to' and 'cc' columns which are lists
        temp_df = temp_df.with_columns([
            pl.col("to").map_elements(normalize_list, return_dtype=str).alias("to_normalized")
        ])
        # Filter rows where any normalized 'to' or 'cc' matches the recipient
        recipient_mask = (
            pl.col("to_normalized").map_elements(lambda x: match_value_in_columns(recipient, x), return_dtype=bool)
        )
        if cc:
            # Normalize 'to' and 'cc' columns which are lists
            temp_df = temp_df.with_columns([
                pl.col("cc").map_elements(normalize_list, return_dtype=str).alias("cc_normalized")
            ])
            # Filter rows where any normalized 'to' or 'cc' matches the recipient
            cc_mask = (
                pl.col("cc_normalized").map_elements(lambda x: match_value_in_columns(recipient, x), return_dtype=bool)
            )
            recipient_mask = recipient_mask | cc_mask

        mask = mask & recipient_mask

    # --- Date filtering (normalize to datetime) ---
    temp_df = temp_df.with_columns(
        pl.col("date")
        .str.to_datetime("%Y-%m-%dT%H:%M:%S%z", strict=False)
        .dt.convert_time_zone("UTC")
        .alias("date_dt")
    )

    if start_date:
        start_dt = parse_datetime_utc(start_date)
        mask = mask & (pl.col("date_dt") >= start_dt)

    if end_date:
        end_dt = parse_datetime_utc(end_date)
        # If only date provided, include the full day
        if len(end_date) == 10:
            end_dt = end_dt + timedelta(days=1) - timedelta(seconds=1)
        mask = mask & (pl.col("date_dt") <= end_dt)

    if labels: 
        labels = [lbl.strip().lower() for lbl in labels]

        temp_df = temp_df.with_columns([
            pl.col("lables").map_elements(normalize_list, return_dtype=str).alias("labels_normalized")
        ])

        labels_mask = pl.col("labels_normalized").map_elements(
            lambda email_lables: any(lbl in email_lables for lbl in labels),
            return_dtype=bool
        )

        mask = mask & labels_mask

    if subject:    
        subject_mask = pl.col("subject").map_elements(lambda x: smart_subject_match(subject, x), return_dtype=bool)
        mask = mask & subject_mask

    # Apply the mask only once
    temp_df = temp_df.filter(mask)

    # --- Sorting ---
    temp_df = temp_df.sort(
        by=sort_by,
        descending=(sort_order.lower() == "desc")
    )

    # --- Handle empty result ---
    if temp_df.is_empty():
        return "No emails found matching the specified criteria."

    # --- Preview results ---
    total_matches = temp_df.height
    preview_cols = ["id", "threadId", "from", "to", "subject", "date_dt", "cc", "snippet", "labels", "attachments"]
    if body:
        preview_cols.append("body_text")
    if html:
        preview_cols.append("body_html")

    if limit is None:
        results_preview = temp_df.select(preview_cols).to_dicts()
    else:
        results_preview = temp_df.head(limit).select(preview_cols).to_dicts()

    def fmt(res):
        parts = [
            f"id: {res.get('id','N/A')}",
            f"ThreadId: {res.get('threadId','N/A')}",
            f"From: {res.get('from','N/A')}",
            f"To: {res.get('to','N/A')}",
            f"CC: {res.get('cc','N/A')}",
            f"Subject: {res.get('subject','N/A')}",
            f"Date: {human_readable_date(res.get('date_dt'))}",
            f"Snippet: {res.get('snippet','N/A')}",
            f"Labels: {res.get('labels','N/A')}",
            f"Attachments: {res.get('attachments','N/A')}",
        ]
        if body:
            parts.append(f"Body: {res.get('body_text','N/A')}")
        if html:
            parts.append(f"HTML: {res.get('body_html','N/A')}")
        return "\n".join(parts)
    
    formatted_results = "\n\n---\n\n".join(fmt(r) for r in results_preview)
    shown = total_matches if limit is None else min(int(limit), total_matches)
    return f"Found {total_matches} emails matching the criteria. Showing {shown}:\n\n{formatted_results}"

In [26]:
email_filtering_tool(
    uid="198ca6688975b1b3",
    threadId=None,
    sender=None,
    recipient=None,
    subject=None,
    cc=False,
    labels=None,
    start_date=None,
    end_date=None,
    body=True,
    html=False,
    sort_by="date_dt",
    sort_order="desc",
    limit=5
)

email_filtering_tool is being called 198ca6688975b1b3, None, None, None, None, False, None, None, None, True, False, date_dt, desc, 5


"Found 1 emails matching the criteria. Showing 1:\n\nid: 198ca6688975b1b3\nThreadId: 197eed51c0afe726\nFrom: Santosh Turamari <santoshbt@outlook.com>\nTo: ['sankar.narayanan@2getherments.com']\nCC: ['ramakrishna@2getherments.com', 'hari@2getherments.com', 'pallavi@2getherments.com', 'meena@2getherments.com', 'customer.communications@2getherments.com', 'shachiraju@2getherments.com']\nSubject: Re: Refund Statement - 711\nDate: N/A\nSnippet: Hi Ramakrishna, I have given the account details to transfer the refund amount. You can deduct the interior deposit from the same. Please refund 22412 Rs this week. Confirm once transferred. Thanks,\nLabels: ['UNREAD', 'IMPORTANT', 'CATEGORY_PERSONAL', 'INBOX']\nAttachments: []\nBody: Hi Ramakrishna,\r\n\r\nI have given the account details to transfer the refund amount.\r\nYou can deduct the interior deposit from the same.\r\nPlease refund 22,412 Rs this week.\r\n\r\nConfirm once transferred.\r\n\r\nThanks,\r\nSantosh T\r\n____________________________

In [None]:
# 2025-08-28 11:50:03