# Extract HTML Data Into CSV FILE 

In [11]:
from pathlib import Path

# File paths
INPUT_FILE = Path("My Activity.html")
CATEGORY_CSV = Path("category_list.csv")

# Logging config
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
LOG_LEVEL = "INFO"


In [13]:
"""
Google Pay HTML transaction extractor.

This script parses Google Pay HTML export, extracts structured transactions,
and writes them into a clean CSV. It supports categorization based on a
predefined category CSV.
"""

from __future__ import annotations
import csv
import logging
import re
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import html as pyhtml
import pandas as pd
# from config import INPUT_FILE, CATEGORY_CSV, OUTPUT_SUFFIX, LOG_FORMAT, LOG_LEVEL

try:
    from dateutil import parser as dateparser
except Exception:
    dateparser = None

# Setup logging
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT)
logger = logging.getLogger(__name__)


def load_category_map(path: Path) -> Dict[str, List[str]]:
    """
    Load category keywords from CSV and build a mapping.
    Tries UTF-8 first, falls back to cp1252/latin1.
    """
    encodings_to_try = ["utf-8", "cp1252", "latin1"]
    df = None

    for enc in encodings_to_try:
        try:
            df = pd.read_csv(path, dtype=str, encoding=enc).fillna("")
            logger.info("Loaded category CSV with encoding: %s", enc)
            break
        except Exception as e:
            logger.warning("Failed to load CSV with encoding %s: %s", enc, e)

    if df is None:
        logger.error("Could not load category CSV at all.")
        return {}

    category_map: Dict[str, List[str]] = {}
    for col in df.columns:
        kws = [str(x).strip().lower() for x in df[col].tolist() if str(x).strip()]
        seen, out = set(), []
        for kw in kws:
            if kw not in seen:
                seen.add(kw)
                out.append(kw)
        if out:
            category_map[col] = out
    logger.info("Loaded %d categories from %s", len(category_map), path)
    return category_map



def html_to_text_fast(html: str) -> str:
    """
    Convert raw HTML into plain text by stripping tags and decoding entities.
    """
    html = re.sub(r"(?is)<(script|style).*?>.*?</\1>", " ", html)
    html = re.sub(r"(?is)<br\s*/?>", "\n", html)
    html = re.sub(r"(?is)<[^>]+>", "\n", html)
    text = pyhtml.unescape(html)
    text = text.replace("\u00A0", " ").replace("\u202F", " ")
    text = re.sub(r"[ \t]+", " ", text)
    text = re.sub(r"\n{3,}", "\n\n", text)
    return text.strip()


def normalize(s: str) -> str:
    """Normalize whitespace and trim string."""
    s = s.replace("\u00A0", " ").replace("\u202F", " ")
    s = re.sub(r"[ \t]+", " ", s)
    return s.strip()


def extract_amount(sentence: str) -> Optional[float]:
    """Extract INR amount from a sentence like 'Paid ₹ 200.50'."""
    m = re.search(r"₹\s*([0-9][0-9,]*(?:\.\d{1,2})?)", sentence)
    if not m:
        return None
    try:
        return float(m.group(1).replace(",", ""))
    except Exception:
        return None


def extract_type_party_instrument(sentence: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
    """Extract transaction type, counterparty, and instrument from a sentence."""
    ttype, party, instr = None, None, None

    try:
        mt = re.match(r"^(Paid|Sent|Received)\b", sentence, flags=re.I)
        if mt:
            ttype = mt.group(1).title()

        mp = re.search(r"\b(?:to|from|at)\s+(.+?)(?:\s+using\b|$)", sentence, flags=re.I)
        if mp:
            party = mp.group(1).strip(" .;")

        mi = re.search(r"\busing\s+(.+)$", sentence, flags=re.I)
        if mi:
            instr = mi.group(1).strip(" .;")
    except Exception as e:
        logger.error("Failed to extract type/party/instrument: %s", e)

    return ttype, party, instr


def extract_last4(instr: Optional[str]) -> Optional[str]:
    """Extract last 4 digits from a masked instrument string."""
    if not instr:
        return None
    try:
        m = re.search(r"(?:X|\*)+(?:\d{2,})", instr, flags=re.I)
        if m:
            digits = re.findall(r"\d", m.group(0))
            if len(digits) >= 4:
                return "".join(digits[-4:])
        m2 = re.search(r"(\d{4,})\b", instr)
        if m2:
            return m2.group(1)[-4:]
    except Exception:
        return None
    return None


def parse_ts(line: Optional[str]):
    """Parse timestamp into multiple formats (ISO, date, time, weekday, month)."""
    if not line or not dateparser:
        return None, None, None, None, None, None
    try:
        dt = dateparser.parse(line)
        return (
            dt.isoformat(),
            dt.date().isoformat(),
            dt.strftime("%I:%M:%S %p"),
            dt.strftime("%H:%M:%S"),
            dt.strftime("%A"),
            dt.strftime("%Y-%m"),
        )
    except Exception:
        return None, None, None, None, None, None


def match_category(counterparty: Optional[str], category_map: Dict[str, List[str]]) -> Optional[str]:
    """Match counterparty name against category keywords."""
    if not isinstance(counterparty, str) or not counterparty.strip():
        return None
    name = counterparty.lower().strip()
    for cat, kws in category_map.items():
        for kw in kws:
            if kw and kw in name:
                return cat
    return None


def parse_text_lines(lines: List[str], source_name: str, category_map: Dict[str, List[str]]) -> List[Dict]:
    """
    Parse lines of text and extract structured transactions.

    Args:
        lines: List of normalized text lines.
        source_name: Original file name.
        category_map: Category keywords.

    Returns:
        List of transaction dicts.
    """
    logger.info("Parsing transactions from text lines...")
    txns = []
    start_re = re.compile(r"^(Paid|Sent|Received)\s+₹\s*([0-9][0-9,]*(?:\.\d{1,2})?)", re.I)
    month_word = r"(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)"
    ts_re = re.compile(rf"\b{month_word}\s+\d{{1,2}},\s+\d{{4}},\s+\d{{1,2}}:\d{{2}}:\d{{2}}\s+(?:AM|PM)\s+GMT[+\-]\d{{2}}:\d{{2}}\b")

    n, i = len(lines), 0
    while i < n:
        line = lines[i]
        if not start_re.search(line):
            i += 1
            continue

        try:
            sentence = line
            ttype, party, instr = extract_type_party_instrument(sentence)
            amount = extract_amount(sentence)
            last4 = extract_last4(instr)

            # Timestamp
            ts_line, next_start_at = None, None
            for j in range(i + 1, min(n, i + 25)):
                if not ts_line:
                    m = ts_re.search(lines[j])
                    if m:
                        ts_line = m.group(0)
                if next_start_at is None and start_re.search(lines[j]):
                    next_start_at = j
                if ts_line and next_start_at:
                    break
            dt_iso, date_s, time_12, time_24, weekday, month = parse_ts(ts_line)

            # Status and Details ID
            details_id, status = None, None
            end_idx = next_start_at if next_start_at else min(n, i + 25)
            for j in range(i + 1, end_idx):
                s, low = lines[j], lines[j].lower()
                if not status and low in {
                    "completed","failed","pending","cancelled","canceled","processing","refunded","success","succeeded"
                }:
                    status = s.title()
                if "details" in low and ":" in s:
                    for k in range(j + 1, min(end_idx, j + 5)):
                        x = lines[k].strip()
                        if x and not x.endswith(":") and re.match(r"^[A-Za-z0-9+/=_-]{6,}$", x):
                            details_id = x
                            break
                if not details_id and s and not s.endswith(":") and re.match(r"^[A-Za-z0-9+/=_-]{10,}$", s):
                    details_id = s

            direction = "outgoing" if (ttype in {"Paid", "Sent"}) else ("incoming" if ttype == "Received" else None)
            category_guess = match_category(party, category_map)
            if not status:
                status = "No Status Found" if ttype == "Paid" else None

            txns.append({
                "source_file": source_name,
                "transaction_type": ttype,
                "direction": direction,
                "status": status,
                "amount_inr": amount,
                "currency": "INR" if amount is not None else None,
                "counterparty": party,
                "category_guess": category_guess,
                "payment_instrument": instr,
                "account_last4": last4,
                "datetime_local": dt_iso,
                "date": date_s,
                "time": time_12,
                "time_24h": time_24,
                "weekday": weekday,
                "month": month,
                "year": date_s.split("-")[0] if date_s else None,
                "product": "Google Pay",
                "details_id": details_id,
                "raw_text": sentence,
            })
        except Exception as e:
            logger.error("Failed parsing transaction at line %d: %s", i, e)

        i = next_start_at if next_start_at else (i + 1)

    logger.info("Parsed %d transactions", len(txns))
    return txns


def extract_single_file(input_file: Path, category_csv: Path) -> pd.DataFrame:
    """
    Extract transactions from a single Google Pay HTML file.

    Args:
        input_file: Path to HTML file.
        category_csv: Path to category CSV.

    Returns:
        DataFrame with extracted transactions.
    """
    logger.info("Starting extraction from file: %s", input_file)

    # Load categories
    cat_map = load_category_map(category_csv)

    try:
        raw = input_file.read_text(encoding="utf-8", errors="ignore")
    except Exception as e:
        logger.critical("Failed to read input file: %s", e)
        raise

    text = html_to_text_fast(raw)
    lines = [normalize(ln) for ln in text.splitlines() if normalize(ln)]
    rows = parse_text_lines(lines, source_name=input_file.name, category_map=cat_map)
    df = pd.DataFrame(rows)

    # Ensure all columns exist
    cols = [
        "source_file","transaction_type","direction","status","amount_inr","currency",
        "counterparty","category_guess","payment_instrument","account_last4",
        "datetime_local","date","time","time_24h","weekday","month","year",
        "product","details_id","raw_text"
    ]
    for c in cols:
        if c not in df.columns:
            df[c] = None
    df = df[cols]

    # Deduplicate
    def make_key(row):
        did = row.get("details_id")
        if pd.notna(did) and str(did).strip():
            return ("id", str(did).strip())
        return (
            "composite",
            row.get("transaction_type"),
            row.get("amount_inr"),
            row.get("currency"),
            (row.get("counterparty") or "").strip().lower(),
            (row.get("payment_instrument") or "").strip().lower(),
            row.get("account_last4"),
            row.get("datetime_local"),
        )
    keys = df.apply(make_key, axis=1)
    df = df.loc[~keys.duplicated()].reset_index(drop=True)

    # 🔹 Always write a common file name
    out_path = input_file.parent / "extracted_gpay_data.csv"
    try:
        df.to_csv(out_path, index=False, encoding="utf-8-sig")
        logger.info("Saved extracted data to %s", out_path)
    except Exception as e:
        logger.error("Failed to write output file: %s", e)

    return df



if __name__ == "__main__":
    df = extract_single_file(INPUT_FILE, CATEGORY_CSV)
    logger.info("Extraction complete. Rows: %d, Cols: %d", df.shape[0], df.shape[1])


2025-09-05 23:38:39,912 - INFO - Starting extraction from file: Shruti.html
2025-09-05 23:38:39,920 - INFO - Loaded category CSV with encoding: utf-8
2025-09-05 23:38:39,922 - INFO - Loaded 20 categories from category_list.csv
2025-09-05 23:38:40,164 - INFO - Parsing transactions from text lines...
2025-09-05 23:38:40,966 - INFO - Parsed 3497 transactions
2025-09-05 23:38:41,058 - INFO - Saved extracted data to extracted_gpay_data.csv
2025-09-05 23:38:41,062 - INFO - Extraction complete. Rows: 3497, Cols: 20
