In [2]:
import pandas as pd
import re, os

In [3]:
def find_most_recent_data(target, reaction, exfor_dir):
    most_recent = []
    # Walk through directory, ignore subdirectories. Pick out each file
    for folder, _, files in os.walk(exfor_dir):
        for file_name in files:

            # Build in some resilience
            path = os.path.join(folder, file_name)
            try:
                # Open each file in turn and read it in
                with open(path, encoding="utf-8", errors="ignore") as working_file:
                    file_contents = working_file.read()

                    # Check for the specific reaction, pick out date with regex
                    if rf"REACTION   ({target}{reaction}" in file_contents:
                        entry_line = re.search(r"ENTRY\s+\S+\s+(\d{8})", file_contents)

                        #If the date exists, append the list
                        if entry_line:
                            most_recent.append((entry_line.group(1), path))

            except Exception as exception_text:
                # If it all goes wrong, where did it all go wrong?
                print(f"Skipped {path}: {exception_text}")

    # Tell me pls
    most_recent.sort(reverse=True)
    return most_recent

In [4]:
def parse_exfor_file(file_name):
    #Create list of formatted sub-entries
    sub_entries_formatted = []

    # Open EXFOR file
    with open(file_name, encoding="utf-8", errors="ignore") as working_file:
        file_contents = working_file.read()

    # Pick up and assign metadata
    entry_match = re.search(r"ENTRY\s+(\S+)\s+(\d{8})", file_contents)
    if entry_match:
        entry_id = entry_match.group(1)
        entry_date = entry_match.group(2)
    else:
        entry_id = None
        entry_date = None

    # Check to see if the file has subentries
    if "SUBENT" in file_contents:
        file_has_sub_entries = True
    else:
        file_has_sub_entries = False

    # If the file has subentries, split it up into blocks. If not, make it one block
    if file_has_sub_entries:
        sub_entries = re.findall(r"(SUBENT\s+\S+.*?ENDSUBENT)", file_contents, re.S)
    else:
        sub_entries = [file_contents]

    for sub_entry in sub_entries:
        # Create list of dataframes
        dataframes = []

        # Pick up and assign metadata
        sub_match = re.search(r"SUBENT\s+(\S+)\s+(\d{8})", sub_entry)
        if sub_match:
            sub_id = sub_match.group(1)
            sub_date = sub_match.group(2)
        else:
            sub_id = None
            sub_date = None

        # Split up data block
        data_blocks = re.findall(r"DATA(.*?)ENDDATA", sub_entry, re.S)
        if not data_blocks and not file_has_sub_entries:
            # Some single-entry files don't have data tags
            data_blocks = [sub_entry]

        # Create a Pandas dataframe to store the data
        for block in data_blocks:
            dataframe = create_dataframe(block)
            if not dataframe.empty:
                dataframes.append(dataframe)

        # Add each sub-entry into the list
        sub_entries_formatted.append({
            "subentry_id": sub_id,
            "subentry_date": sub_date,
            "data": dataframes
        })

    # Return formatted database
    return {
        "entry_id": entry_id,
        "entry_date": entry_date,
        "subentries": sub_entries_formatted
    }


# Helper function to detect the units the block is in
def read_in_headers_and_units(lines):
    #Check to see if there is a "DATA" line. Sometimes it's on the same line, etc. (Poor formatting IMO)
    data_index = next((i for i, l in enumerate(lines) if l.strip().upper().startswith("DATA")), None)

    # choose search segment
    if data_index is not None:
        search_start = data_index + 1
    else:
        search_start = 0

    # lines to search
    for index in range(search_start, len(lines)):
        line = lines[index].strip()

        # match header patterns common in EXFOR
        if re.match(r"^(EN-|EN\s|EN-MIN\b|EN-MAX\b|EN-RES\b|EN-EXP\b|ENERGY\b)", line, re.IGNORECASE):

            # Split header into a list
            header = re.split(r"\s+", line)

            # Units should be here
            units_idx = index + 1
            units = None

            # Clean things up
            if units_idx < len(lines):
                units_line = lines[units_idx].strip()

                # only accept line if it has the right characters (This has to be here otherwise it throws a tantrum, for some reason)
                if units_line and re.search(r"[A-Za-z/*]", units_line):
                    units = re.split(r"\s+", units_line)
                else:
                    units = None

            # Start looking at data on the next line; even if no unit line
            if units is not None:
                start_idx = units_idx + 1
            else:
                start_idx = index + 1
            return header, units, start_idx

    # Fallback to nil vals
    return None, None, 0


# Helper to do the actual data parsing
def parse_numeric_block(lines, start_index):
    rows = []

    # Check every line, starting after the units line
    for line in lines[start_index:]:
        # Break loop after data
        if line.startswith("ENDDATA"):
            break

        # Just in case the line doesn't have numbers in it for some reason (#EXFORsucks)
        try:
            numbers = []
            for number in re.split(r"\s+", line):
                numbers.append(float(number))
            rows.append(numbers)
        except ValueError:
            continue

    # Return the dataframe with the numbers (hopefully) correctly formatted
    return pd.DataFrame(rows)


# Helper to convert energy column to MeV
def convert_energy_to_mev(dataframe, unit):
    # Identify unit, set scale factors
    unit = (unit or "MEV").upper()
    factors = {"EV": 1e-6, "KEV": 1e-3, "MEV": 1.0, "GEV": 1e3}
    factor = factors.get(unit, 1.0)

    # Scale energy data, rename column appropriately
    dataframe.iloc[:, 0] *= factor
    dataframe.rename(columns={dataframe.columns[0]: "Energy (MeV)"}, inplace=True)

    # This defaults to MeV in case of no units, which is maybe not ideal. Something to think about in the future, can't think of a solution rn.
    return dataframe


# Helper to convert all reaction data to barns
def convert_reaction_data(dataframe, units):
    # Set scale factors
    factors = {
        "B": 1.0, "MB": 1e-3, "UB": 1e-6, "NB": 1e-9,
        "PB": 1e-12, "KB": 1e3, "MB*EV": 1e-3, "MB/SR": 1e-3,
        "EV": 1e-6, "KEV": 1e-3, "MEV": 1.0, "GEV": 1e3
    }

    # Iterate over the columns, scale each in turn
    for col, unit in zip(dataframe.columns, units):
        #print(col, unit)
        unit = unit.upper()
        if unit == "NO-DIM":
            unit = "B"
        if "B" in unit:
            base_unit = unit.split("*")[0].split("/")[0]
            factor = factors.get(base_unit, 1.0)
            dataframe[col] *= factor
            # Preserve suffix (*EV, /SR, etc.)
            suffix = ""
            if "*" in unit:
                suffix = "*" + unit.split("*", 1)[1]
            elif "/" in unit:
                suffix = "/" + unit.split("/", 1)[1]

            dataframe.rename(columns={col: f"{col} (b{suffix})"}, inplace=True)
        else:
            unit = (unit or "MEV").upper()
            factor = factors.get(unit, 1.0)

            # Scale energy data, rename column appropriately
            dataframe[col] *= factor
            dataframe.rename(columns={col: f"{col} (MeV)"}, inplace=True)

    return dataframe


# Fully parse block into dataframe
def create_dataframe(data_block):
    lines = []
    # Clean lines, detect header/units
    for line in data_block.strip().splitlines():
        if line.strip():
            lines.append(line.strip())
    header, unit_line, start_idx = read_in_headers_and_units(lines)

    # Parse data, check if the data exists
    dataframe = parse_numeric_block(lines, start_idx)
    if dataframe.empty:
        return dataframe

    # Check that there is actually a header
    if header and len(header) == dataframe.shape[1]:
        dataframe.columns = header
    else:
        dataframe.columns = [f"Col{i + 1}" for i in range(dataframe.shape[1])]

    if not unit_line:
        # Fallback to default units (MeV, b)
        unit_line = ["MEV"] + ["b"] * (dataframe.shape[1] - 1)

    # Ensure unit_line matches dataframe width
    while len(unit_line) < len(dataframe.columns):
        unit_line.append("")

    # Convert units
    #dataframe = convert_energy_to_mev(dataframe, unit_line[0])
    dataframe = convert_reaction_data(dataframe, unit_line)
    return dataframe

In [6]:
def read_END_CSV(filename):

    # Read the CSV file
    file_to_return = pd.read_csv(filename, sep=';', low_memory=False)

    # Clean columns
    file_to_return.columns = file_to_return.columns.str.strip()
    file_to_return = file_to_return.dropna()
    for column in file_to_return.columns:
        file_to_return[column] = pd.to_numeric(file_to_return[column], errors="coerce")

    return file_to_return