In [None]:
import os
import logging
import numpy as np
import pandas as pd
from pathlib import Path
import sys
from datetime import datetime
import re
# ---------------- CONFIGURATION ----------------
SOURCE_FOLDER = r"E:\\Automation\\Card_recon\\original reports"   # change this
LOG_FILE = "conversion.log"
MIN_TEXT_RATIO = 0.75           # heuristic threshold
# -----------------------------------------------


# ---------------- LOGGING SETUP ----------------
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logging.info("===== Conversion started =====")
# -----------------------------------------------


def is_likely_text(data: bytes) -> bool:
    """
    Heuristic check:
    If most bytes are printable ASCII or whitespace,
    treat file as text.
    """
    if not data:
        return False

    printable = sum(
        1 for b in data
        if 32 <= b <= 126 or b in (9, 10, 13)
    )

    ratio = printable / len(data)
    return ratio >= MIN_TEXT_RATIO


def convert_file(input_path: str):
    base, _ = os.path.splitext(input_path)
    output_path = base + ".txt"

    with open(input_path, "rb") as f:
        raw = f.read()

    # Integrity check
    if len(raw) == 0:
        logging.warning(f"Empty file skipped: {input_path}")
        return

    # Reconstruction detection
    if not is_likely_text(raw):
        logging.warning(
            f"Binary or archive split detected. "
            f"Reconstruction required: {input_path}"
        )
        return

    # Decode safely
    text = raw.decode("utf-8", errors="ignore")

    with open(output_path, "w", encoding="utf-8") as out:
        out.write(text)

    logging.info(f"Converted: {input_path} -> {output_path}")
    print(f"âœ” Converted: {os.path.basename(input_path)}")


def convert_folder(folder_path: str):
    files = [f for f in os.listdir(folder_path) if f.lower().endswith(".001")]

    if not files:
        print("No .001 files found.")
        return

    total = len(files)
    print(f"Found {total} .001 files\n")

    for index, filename in enumerate(files, start=1):
        full_path = os.path.join(folder_path, filename)
        print(f"[{index}/{total}] Processing {filename}...")
        convert_file(full_path)

    print("\nConversion completed.")


# -------------------- RUN ----------------------
if __name__ == "__main__":
    if not os.path.isdir(SOURCE_FOLDER):
        raise FileNotFoundError("Source folder does not exist")

    convert_folder(SOURCE_FOLDER)
    logging.info("===== Conversion finished =====")


In [None]:

# Configure file paths via environment variables or defaults
BASE_DIR = Path(os.environ.get('MC_BASE_DIR', r'E:\\Automation\\Card_recon\\original reports'))
LOE_FILE = Path(os.environ.get('MC_LOE_FILE', 'LOE20250219.xlsx'))
OUTGOING_FILE = Path(os.environ.get('MC_OUT_FILE', 'All Outgoing Transaction Details TWI_2025_02_20_080441.xlsx'))
MC_SETT_FILE = Path(os.environ.get('MC_SETT_DIR','output.txt'))
LOE_PATH = BASE_DIR / LOE_FILE
OUTGOING_PATH = BASE_DIR / OUTGOING_FILE
MC_SETT_PATH = BASE_DIR / MC_SETT_FILE
# Logging setup: file + console. Log filename can be overridden with MC_LOG_FILE env var.
LOG_FILE = Path(os.environ.get('MC_LOG_FILE', f'conversion.log'))
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler(sys.stdout)])
logger = logging.getLogger(__name__)
# Output directory for reports (override with MC_OUTPUT_DIR env var)
OUTPUT_DIR = Path(os.environ.get('MC_OUTPUT_DIR', r'E:\\Automation\\Card_recon\\MC_\\mc_reports'))
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
logger.info(f'Configured paths: LOE_PATH={LOE_PATH} OUTGOING_PATH={OUTGOING_PATH} LOG_FILE={LOG_FILE} OUTPUT_DIR={OUTPUT_DIR}')


In [None]:
# Settlement-only verification (safe parser)
logger.info('Running settlement-only read to verify parsing of MC_SETT_PATH (no LOE/Outgoing re-read)')
mc_settl = pd.DataFrame()
if MC_SETT_PATH.exists():
    parsed = False
    for sep,name in [('	','TSV'), ('|','PIPE'), (',','CSV')]:
        try:
            mc_settl = pd.read_csv(str(MC_SETT_PATH), sep=sep, engine='python')
            logger.info(f'Read MC settlement as {name}: {MC_SETT_PATH} (shape={mc_settl.shape})')
            parsed = True
            break
        except Exception:
            continue
    if not parsed:
        try:
            mc_settl = pd.read_fwf(str(MC_SETT_PATH))
            logger.info(f'Read MC settlement as fixed-width: {MC_SETT_PATH} (shape={mc_settl.shape})')
        except Exception:
            logger.exception(f'Failed to parse MC settlement file: {MC_SETT_PATH}')
            mc_settl = pd.DataFrame()
else:
    logger.warning(f'MC settlement not found: {MC_SETT_PATH}')
print('mc_settl shape:', getattr(mc_settl, 'shape', None))
try:
    display(mc_settl.head())
except Exception:
    print(mc_settl.head().to_string())



In [None]:
# Parse settlement blocks: extract header fields and transaction rows, save parsed CSVs
logger.info('Extracting headers and transactions from MC settlement')
from pathlib import Path
out_dir = OUTPUT_DIR / 'mc_settl_parsed'
out_dir.mkdir(parents=True, exist_ok=True)
# obtain raw text from the settlement file (prefer original file)
if MC_SETT_PATH.exists():
    txt = MC_SETT_PATH.read_text(errors='ignore')
elif 'mc_settl' in globals():
    try:
        txt = mc_settl.to_string()
    except Exception:
        txt = ''
else:
    txt = ''
if not txt:
    logger.warning('No settlement text available to parse')
# split into report blocks using report id markers (accept optional -AA)
reports = re.split(r'(?=(1IP727010(?:-AA)?|1IP727020))', txt)
all_rows = []
block_idx = 0
for rep in reports:
    if not rep or not rep.strip():
        continue
    block_idx += 1
    report_text = rep
    # header fields
    def _g(pattern):
        m = re.search(pattern, report_text, re.IGNORECASE)
        return m.group(1).strip() if m else ''
    accept_brand = _g(r'ACCEPTANCE BRAND:\s*(.+)')
    run_date = _g(r'RUN DATE:\s*([0-9/]+)')
    bs_level = _g(r'BUSINESS SERVICE LEVEL:\s*([A-Z0-9-]+)')
    bs_id = _g(r'BUSINESS SERVICE ID:\s*([0-9A-Z-]+)')
    file_id = _g(r'FILE ID:\s*([0-9/]+)')
    member_id = _g(r'MEMBER ID:\s*([0-9A-Z-]+)')
    # find subtotal line for this business service id (if present)
    subtotal_match = re.search(r'BUSINESS SERVICE ID SUBTOTAL\s+([0-9,]+)\s+([-0-9,]+\.\d{2})\s*(CR|DR)?', report_text, re.IGNORECASE)
    subtotal_counts = subtotal_match.group(1).replace(',','') if subtotal_match else ''
    subtotal_amount = subtotal_match.group(2).replace(',','') if subtotal_match else ''
    # parse transaction table lines: look for lines containing a numeric amount pattern
    rows = []
    for ln in report_text.splitlines():
        ln_str = ln.strip()
        if not ln_str:
            continue
        # match lines with an amount like 10,202.21
        amt_matches = re.findall(r'[-0-9,]+\.\d{2}', ln)
        if not amt_matches:
            continue
        # attempt to find counts (an integer before the first amount)
        cnt_m = re.search(r'(\d+)\s+[-0-9,]+\.\d{2}', ln)
        counts = int(cnt_m.group(1).replace(',','')) if cnt_m else None
        recon_amount = float(amt_matches[0].replace(',','')) if len(amt_matches) >= 1 else None
        fee_amount = float(amt_matches[1].replace(',','')) if len(amt_matches) >= 2 else None
        # try to capture CR/DR indicators for recon and fee
        recon_sign = None
        fee_sign = None
        s_recon = re.search(r'([-0-9,]+\.\d{2})\s+(CR|DR)', ln)
        if s_recon:
            recon_sign = s_recon.group(2)
        s_fee = re.search(r'([-0-9,]+\.\d{2}).*(CR|DR)\s*$', ln)
        if s_fee:
            fee_sign = s_fee.group(2)
        # description / label (first 40 chars)
        label = ln[:40].strip()
        row = {
            'block_id': block_idx,
            'accept_brand': accept_brand,
            'run_date': run_date,
            'bs_level': bs_level,
            'bs_id': bs_id,
            'file_id': file_id,
            'member_id': member_id,
            'label': label,
            'counts': counts,
            'recon_amount': recon_amount,
            'recon_sign': recon_sign,
            'fee_amount': fee_amount,
            'fee_sign': fee_sign,
        }
        rows.append(row)
    # save per-block CSV if any rows found
    if rows:
        df_blk = pd.DataFrame(rows)
        blk_file = out_dir / f'block_{block_idx:03d}_bs{bs_id}.csv'
        #df_blk.to_csv(blk_file, index=False)
        #logger.info(f'Wrote parsed block file: {blk_file}')
        all_rows.extend(rows)
    else:
        logger.info(f'No transaction rows parsed for block {block_idx} (bs_id={bs_id})')
# combined CSV
if all_rows:
    df_all = pd.DataFrame(all_rows)
    combined = out_dir / 'mc_settl_parsed_combined.csv'
    df_all.to_csv(combined, index=False)
    logger.info(f'Wrote combined parsed CSV: {combined}')
else:
    logger.warning('No parsed rows extracted from settlement text')
# also write a simple summary file for subtotals if available
summary_lines = []
for rep in reports:
    if 'BUSINESS SERVICE ID SUBTOTAL' in rep.upper():
        m = re.search(r'BUSINESS SERVICE ID SUBTOTAL\s+([0-9,]+)\s+([-0-9,]+\.\d{2})', rep, re.IGNORECASE)
        if m:
            summary_lines.append({'subtotal_counts': m.group(1).replace(',',''), 'subtotal_amount': m.group(2).replace(',','')})
if summary_lines:
    pd.DataFrame(summary_lines).to_csv(out_dir / 'bsid_subtotals.csv', index=False)
    logger.info('Wrote BUSINESS SERVICE ID SUBTOTAL file')
print('Parsing complete. Inspect', out_dir)
