In [1]:
# Libraries
from pathlib import Path
import csv

In [9]:
# Detect file patterns
def detect_and_save_imp_exp_patterns(
    data_dir: str = "data",
    out_dir: str = "support",
):
    """
    Scans all files in data_dir whose names match:
        - IMP_YYYY*.*
        - EXP_YYYY*.*
    detects unique header patterns separately for IMP and EXP,
    and creates FOUR files in 'support/':

    1) pattern_id_imp.csv
    2) pattern_id_exp.csv
    3) file_pattern_imp.csv
    4) file_pattern_exp.csv
    """

    base_path = Path(data_dir)
    if not base_path.exists():
        print(f"Directory '{data_dir}' does not exist. Nothing to do.")
        return

    out_path = Path(out_dir)
    out_path.mkdir(parents=True, exist_ok=True)

    # Separate maps + file lists
    pattern_imp: dict[str, int] = {}
    pattern_exp: dict[str, int] = {}
    file_rows_imp: list[tuple[str, int]] = []
    file_rows_exp: list[tuple[str, int]] = []

    # Helper to read first non-empty header
    def read_header_line(file_path: Path) -> str | None:
        for enc in ("utf-8-sig", "latin-1"):
            try:
                with file_path.open("r", encoding=enc, errors="strict") as f:
                    for line in f:
                        line = line.rstrip("\r\n")
                        if line.strip():
                            return line
                break
            except UnicodeDecodeError:
                continue

        # last fallback
        try:
            with file_path.open("r", encoding="latin-1", errors="ignore") as f:
                for line in f:
                    line = line.rstrip("\r\n")
                    if line.strip():
                        return line
        except Exception:
            return None
        return None

    # Collect IMP_ and EXP_ files
    all_files = sorted(
        f for f in base_path.iterdir()
        if f.is_file() and (f.name.startswith("IMP_") or f.name.startswith("EXP_"))
    )

    if not all_files:
        print(f"No IMP_YYYY or EXP_YYYY files found in '{data_dir}'.")
        return

    # Process headers
    for file_path in all_files:
        header = read_header_line(file_path)
        if header is None:
            print(f"Warning: could not read header from {file_path.name}, skipping.")
            continue

        fname = file_path.name

        if fname.startswith("IMP_"):
            if header not in pattern_imp:
                pattern_id = len(pattern_imp) + 1
                pattern_imp[header] = pattern_id
            else:
                pattern_id = pattern_imp[header]

            file_rows_imp.append((fname, pattern_id))

        else:  # EXP
            if header not in pattern_exp:
                pattern_id = len(pattern_exp) + 1
                pattern_exp[header] = pattern_id
            else:
                pattern_id = pattern_exp[header]

            file_rows_exp.append((fname, pattern_id))

    # ----------------------------------------
    # Write OUT FILES (always inside support/)
    # ----------------------------------------

    # 1) pattern_id_imp.csv
    with (out_path / "pattern_id_imp.csv").open("w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f, delimiter=";")
        writer.writerow(["pattern_id_imp", "column_pattern"])
        for header, pid in sorted(pattern_imp.items(), key=lambda x: x[1]):
            writer.writerow([pid, header])

    # 2) pattern_id_exp.csv
    with (out_path / "pattern_id_exp.csv").open("w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f, delimiter=";")
        writer.writerow(["pattern_id_exp", "column_pattern"])
        for header, pid in sorted(pattern_exp.items(), key=lambda x: x[1]):
            writer.writerow([pid, header])

    # 3) file_pattern_imp.csv
    with (out_path / "file_pattern_imp.csv").open("w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f, delimiter=";")
        writer.writerow(["file_name", "pattern_id_imp"])
        for fname, pid in file_rows_imp:
            writer.writerow([fname, pid])

    # 4) file_pattern_exp.csv
    with (out_path / "file_pattern_exp.csv").open("w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f, delimiter=";")
        writer.writerow(["file_name", "pattern_id_exp"])
        for fname, pid in file_rows_exp:
            writer.writerow([fname, pid])

detect_and_save_imp_exp_patterns(data_dir="data", out_dir="support")

In [10]:
# Merge Datasets
def merge_imp_exp_bases(
    data_dir: str = "data",   # <-- changed from 'dados'
    encoding: str = "latin-1",
):
    """
    Merge all IMP_YYYY*.csv files into data/IMP_COMPLETA.csv
    and all EXP_YYYY*.csv files into data/EXP_COMPLETA.csv.

    Assumes:
    - All IMP files share one header pattern.
    - All EXP files share one header pattern.
    - Only keep header from the first file of each group.
    """

    base_path = Path(data_dir)
    if not base_path.exists():
        print(f"Directory '{data_dir}' does not exist. Nothing to do.")
        return

    # Output files (must be excluded from inputs)
    imp_out = base_path / "IMP_COMPLETA.csv"
    exp_out = base_path / "EXP_COMPLETA.csv"

    # Collect only IMP_YYYY.csv and EXP_YYYY.csv files
    imp_files = sorted(
        f for f in base_path.iterdir()
        if f.is_file()
        and f.name.startswith("IMP_")
        and f.suffix.lower() == ".csv"
        and f.name not in {"IMP_COMPLETA.csv", "EXP_COMPLETA.csv"}
    )

    exp_files = sorted(
        f for f in base_path.iterdir()
        if f.is_file()
        and f.name.startswith("EXP_")
        and f.suffix.lower() == ".csv"
        and f.name not in {"IMP_COMPLETA.csv", "EXP_COMPLETA.csv"}
    )

    if not imp_files:
        print(f"No IMP_YYYY.csv files found in '{data_dir}'.")
    if not exp_files:
        print(f"No EXP_YYYY.csv files found in '{data_dir}'.")

    # Helper function to merge one group (IMP or EXP)
    def merge_group(files: list[Path], output_file: Path, label: str) -> None:
        if not files:
            return

        print(f"Merging {len(files)} {label} files into {output_file.name} ...")

        first_file = True
        with output_file.open("w", encoding=encoding, newline="") as out_f:
            for fp in files:
                with fp.open("r", encoding=encoding, errors="ignore") as in_f:
                    for i, line in enumerate(in_f):
                        if first_file:
                            out_f.write(line)      # Write header normally
                        else:
                            if i == 0:
                                continue           # Skip header on later files
                            out_f.write(line)
                first_file = False

        print(f"✓ Saved: {output_file.name}")

    # Merge IMP → IMP_COMPLETA.csv
    merge_group(imp_files, imp_out, "IMP")

    # Merge EXP → EXP_COMPLETA.csv
    merge_group(exp_files, exp_out, "EXP")

merge_imp_exp_bases("data")

Merging 29 IMP files into IMP_COMPLETA.csv ...
✓ Saved: IMP_COMPLETA.csv
Merging 29 EXP files into EXP_COMPLETA.csv ...
✓ Saved: EXP_COMPLETA.csv
