In [1]:
import csv
import re
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import os
import sys
import time

ROOT_INPUT = Path(r"C:\Users\32011\Dropbox\Rajib\Call reports data\Raw Data\Input files")
OUT_DIR = Path(r"C:\Users\32011\Dropbox\Rajib\Call reports data\Output\Intermediate folders\Parq. Quarters folder")
ENCODING = "latin-1"
KEEP_SCHEDULE_PREFIXES = ("RC", "RI", "SU")  # prefixes we process
SAVE_FORMAT = "parquet"  # 'parquet' (fast) or 'xlsx' (readable, slower) or 'csv'
MAX_WORKERS = min(6, (os.cpu_count() or 2))  # threads for per-quarter parallel schedule processing

In [2]:

# compile regex once
SCHEDULE_RE = re.compile(r"Schedule\s+(.+?)\s+(\d{8})", flags=re.IGNORECASE)
PART_RE = re.compile(r"\((\d+)\s+of\s+(\d+)\)", flags=re.IGNORECASE)
NONWORD_RE = re.compile(r"[^\w\-_]")

def clean_token(s: str) -> str:
    if s is None:
        return ""
    s = s.strip()
    s = s.strip('"').strip("'")
    s = re.sub(r"\s+", " ", s)
    return s

def read_schedule_file_stream(path: Path):
    """
    Stream-read a tab-delimited schedule file robustly:
    - header = first row (csv.reader)
    - skip second row (descriptions)
    - stream rows: pad/truncate to header length
    - return (cols_list, rows_list)
    """
    cols = []
    rows = []
    with path.open("r", encoding=ENCODING, errors="replace", newline="") as fh:
        reader = csv.reader(fh, delimiter="\t")
        try:
            raw_cols = next(reader)
        except StopIteration:
            return [], []
        cols = [clean_token(c).upper() for c in raw_cols]
       # skip description row if present
        try:
            _ = next(reader)
        except StopIteration:
            return cols, []
        ncols = len(cols)
        for row in reader:
            # skip fully empty line
            if not any((cell and cell.strip()) for cell in row):
                continue
            # pad/truncate
            if len(row) < ncols:
                row = row + [""] * (ncols - len(row))
            elif len(row) > ncols:
                row = row[:ncols]
            rows.append(row)
    return cols, rows

def extract_code_and_date_from_stem(stem: str):
    """Extract schedule code and date (if present) from filename stem."""
    pm = PART_RE.search(stem)
    part = None
    if pm:
        part = (int(pm.group(1)), int(pm.group(2)))
    m = SCHEDULE_RE.search(stem)
    if m:
        code = m.group(1).strip()
        date = m.group(2)
    else:
        # fallback: look for 'Schedule ' followed by token
        m2 = re.search(r"Schedule\s+([A-Z0-9]+[A-Z0-9I]*)", stem, flags=re.IGNORECASE)
        if m2:
            code = m2.group(1).strip()
            date = None
        else:
            return None, None, part
    # clean code: remove spaces and stray chars
    code_clean = re.sub(r"[^\w]", "", code).upper()
    return code_clean, date, part

def normalize_out_name(stem: str):
    s = PART_RE.sub(lambda m: f"_{m.group(1)}_of_{m.group(2)}", stem)
    s = s.replace(" ", "_")
    s = NONWORD_RE.sub("", s)
    return s

def save_df(df: pd.DataFrame, out_file: Path, fmt: str):
    """Save DataFrame in requested format, with fallback to CSV if parquet unavailable."""
    out_file.parent.mkdir(parents=True, exist_ok=True)
    if fmt == "parquet":
        try:
            df.to_parquet(out_file, index=False)
            return
        except Exception as e:
            print(f"  ⚠ parquet write failed ({e}), falling back to csv: {out_file.with_suffix('.csv')}")
            df.to_csv(out_file.with_suffix('.csv'), index=False)
            return
    elif fmt == "xlsx":
        try:
            df.to_excel(out_file, index=False)
            return
        except Exception as e:
            print(f"  ⚠ xlsx write failed ({e}), falling back to csv: {out_file.with_suffix('.csv')}")
            df.to_csv(out_file.with_suffix('.csv'), index=False)
            return
    else:
        # csv
        df.to_csv(out_file.with_suffix('.csv'), index=False)
        return

def process_schedule_group(schedule_code, items, qout, qname, save_format):
    """
    items: list of tuples (Path, date, part)
    Returns list of saved file paths (for logging)
    """
    start_t = time.time()
    saved_files = []
    # sort items by part then filename
    items_sorted = sorted(items, key=lambda x: (x[2] is None, x[2] or 0, x[0].name))
    dfs = []
    headers = []
    filenames = []
    for fp, date, part in items_sorted:
        try:
            cols, rows = read_schedule_file_stream(fp)
        except Exception as e:
            print(f"    ⚠ Error reading {fp.name}: {e}")
            continue
        if not cols:
            print(f"    ⚠ No header found in {fp.name}, skipping")
            continue
        # quick check IDRSSD existence in header tokens
        if "IDRSSD" not in cols:
            print(f"    ⚠ Skipping {fp.name} — header lacks IDRSSD")
            continue
        # build DataFrame only once per file
        df = pd.DataFrame(rows, columns=cols)
        dfs.append(df)
        headers.append(tuple(df.columns.tolist()))
        filenames.append(fp.name)

    if not dfs:
        return saved_files
    # if multiple parts and headers identical -> vertical concat
    if len(dfs) > 1 and all(h == headers[0] for h in headers):
        merged_df = pd.concat(dfs, ignore_index=True)
        # keep as strings for speed; user can convert later selectively
        out_base = f"{schedule_code}_{qname}"
        out_name = normalize_out_name(out_base)
        if save_format == "parquet":
            out_file = qout / f"{out_name}.parquet"
        elif save_format == "xlsx":
            out_file = qout / f"{out_name}.xlsx"
        else:
            out_file = qout / f"{out_name}.csv"

        save_df(merged_df, out_file, save_format)
        saved_files.append(out_file)
        elapsed = time.time() - start_t
        print(f"  Merged {len(dfs)} parts -> {out_file}  rows={len(merged_df)} cols={len(merged_df.columns)}  ({elapsed:.1f}s)")
    else:
        # save parts individually (fast)
        for idx, (df, header, fname) in enumerate(zip(dfs, headers, filenames), start=1):
            part_match = PART_RE.search(fname)
            part_label = ""
            if part_match:
                part_label = f"_{part_match.group(1)}_of_{part_match.group(2)}"
            elif len(dfs) > 1:
                part_label = f"_part{idx}"

            out_base = f"{schedule_code}{part_label}_{qname}"
            out_name = normalize_out_name(out_base)
            if save_format == "parquet":
                out_file = qout / f"{out_name}.parquet"
            elif save_format == "xlsx":
                out_file = qout / f"{out_name}.xlsx"
            else:
                out_file = qout / f"{out_name}.csv"

            save_df(df, out_file, save_format)
            saved_files.append(out_file)
            print(f"    Saved part -> {out_file}  rows={len(df)} cols={len(df.columns)}")
    return saved_files

def process_quarter(qfolder: Path):
    qname = qfolder.name
    qout = OUT_DIR / qname
    qout.mkdir(parents=True, exist_ok=True)
    t0 = time.time()
    print(f"\nProcessing quarter: {qname}  (path: {qfolder})")
    txt_files = sorted(qfolder.glob("*.txt"))
    if not txt_files:
        print("  No txt files, skipping")
        return

    # group files by schedule code
    groups = {}
    for f in txt_files:
        stem = f.stem
        # skip POR and bulk
        if "POR" in stem.upper() or "BULK" in stem.upper():
            continue
        code, date, part = extract_code_and_date_from_stem(stem)
        if not code:
            continue
        if not any(code.startswith(pref) for pref in KEEP_SCHEDULE_PREFIXES):
            continue
        groups.setdefault(code, []).append((f, date, part))
    if not groups:
        print("  No RC/RI/SU schedules found in this quarter.")
        return

    # process groups in parallel threads for this quarter
    results = []
    with ThreadPoolExecutor(max_workers=max(1, MAX_WORKERS)) as ex:
        futures = {ex.submit(process_schedule_group, code, items, qout, qname, SAVE_FORMAT): code for code, items in groups.items()}
        for fut in as_completed(futures):
            code = futures[fut]
            try:
                saved = fut.result()
                results.extend(saved)
            except Exception as e:
                print(f"  ⚠ Error processing schedule {code}: {e}")

    elapsed = time.time() - t0
    print(f"Completed quarter {qname}: processed {len(groups)} schedules, saved {len(results)} files  ({elapsed:.1f}s)")

# -----------------------------------------------
# Run for all quarters (sequentially over quarters)
quarters = sorted([p for p in ROOT_INPUT.iterdir() if p.is_dir()])
if not quarters:
    print("No quarter folders found under:", ROOT_INPUT)
    sys.exit(0)

overall_start = time.time()
for q in quarters:
    process_quarter(q)

print("\nAll done. Total time:", time.time() - overall_start)


Processing quarter: 2001-03-31  (path: C:\Users\32011\Dropbox\Rajib\Call reports data\Raw Data\Input files\2001-03-31)
    Saved part -> C:\Users\32011\Dropbox\Rajib\Call reports data\Output\Intermediate folders\Parq. Quarters folder\2001-03-31\RCCII_2001-03-31.parquet  rows=8857 cols=2
    Saved part -> C:\Users\32011\Dropbox\Rajib\Call reports data\Output\Intermediate folders\Parq. Quarters folder\2001-03-31\RCA_2001-03-31.parquet  rows=8857 cols=19
    Saved part -> C:\Users\32011\Dropbox\Rajib\Call reports data\Output\Intermediate folders\Parq. Quarters folder\2001-03-31\RCD_2001-03-31.parquet  rows=8857 cols=21
    Saved part -> C:\Users\32011\Dropbox\Rajib\Call reports data\Output\Intermediate folders\Parq. Quarters folder\2001-03-31\RCEII_2001-03-31.parquet  rows=8857 cols=9
    Saved part -> C:\Users\32011\Dropbox\Rajib\Call reports data\Output\Intermediate folders\Parq. Quarters folder\2001-03-31\RCB_2001-03-31.parquet  rows=8857 cols=196
    Saved part -> C:\Users\32011\Drop