In [None]:
# for harvesting the training data
# all of the modules and global variables are defined here
from sickle import Sickle
from pathlib import Path
import json

# destination for fetched docs, goes to my large SSD in this case
# change internal strings to match your system and needs
DEST_LARGE = Path("/mnt/d/data-large/").absolute()
# stored locally if size is not a concern
DEST_SMALL = Path().cwd().absolute() / "datasets/"
# alternative local directory
DEST_SMALL_ALT = Path().cwd().absolute() / "datasets-alt/"
# general repository for pulling data OAI-PMH-compliant
WORKING_REPO = "https://oai.datacite.org/oai/"
# umich OAI-PMH repository for deepblue/dspace
UMICH_REPO = "https://deepblue.lib.umich.edu/dspace-oai/request/"
# set identifier for library
BHL_SET = "com_2027.42_65133"
 # collection of other endpoints I utilized
ENDPOINT_COLLECTION = {
    "IJHS": "https://www.ijhsonline.com/index.php/IJHS/oai",
    "IJESS": "https://journalkeberlanjutan.com/index.php/ijesss/oai",
    "Medan": "https://jurnal.medanresourcecenter.org/index.php/ICI/oai?",
    "YWNFR": "https://jurnal.ywnr.org/index.php/cfabr/oai",
    "UTOR": "https://symposia.library.utoronto.ca/index.php/symposia/oai",
}

In [None]:
def harvester(*args):
    dest = url = metadata_prefix = max_files = dataset = None

    # this try/except essentially tries to populate five arguments, and then only
    # four if it fails to unpack 5
    try:
        dest, url, metadata_prefix, max_files, dataset = args
    except ValueError:
        dest, url, metadata_prefix, max_files = args
    if isinstance(dest, str):
        dest = Path(dest)
    if not dest.exists():
        dest.mkdir(parents=True, exist_ok=True)

    sckl = Sickle(url)
    records = sckl.ListRecord(metadataPrefix=metadata_prefix, set=dataset)
    filecount = 0
    errorcount = 0
    try:
        for rec in records:
            id = rec.header.identifier.replace(":", "_").replace("/", "_")
            try:
                metadata_json = json.dumps(rec.metadata, indent=2)
                filepath = f"{dest / Path(id)}.json"
                with open(filepath, "w") as f:
                    f.write(metadata_json)
                print(f"wrote #{filecount}: {id}")
                filecount += 1
            except (AttributeError, TypeError) as e:
                print(f"skipped {id} due to json incompatibility: {e}")
                errorcount += 1
                continue
            if filecount >= int(max_files):
                print(f"Final filecount: {filecount}")
                print(f"Final errorcount: {errorcount}")
                return
    except IndexError as e:
        raise Exception(
            f"Error: {e} - there may be an issue with your call to the data source"
        )


def records_aggregator(records_path: str | Path) -> dict:

    if isinstance(records_path, str):
        records_path = Path(records_path)
    error_count = 0
    proc = {}
    rec = None

    for file in records_path.glob("*.json"):
        try:
            with open(file, "r", encoding="utf-8") as f:
                rec = json.load(f)
            for k in rec.keys():
                if k not in proc.keys() and k == "description":
                    proc[k] = [
                        v for v in rec[k] if v and not v.startswith("http")
                    ]
                elif k not in proc.keys():
                    proc[k] = rec[k]
                elif rec[k]:
                    for v in rec[k]:
                        if v not in proc[k]:
                            # to skip urls in umich descriptions, since they're more administrative
                            if (
                                "umich" in file.name
                                and k == "description"
                                and v
                                and v.startswith("http")
                            ):
                                continue
                            proc[k].append(v)
        except (json.JSONDecodeError, AttributeError, TypeError) as e:
            print(
                f"skipped {file} due to json incompatibility or similar issue"
            )
            print(f"Error code: {e}")
            error_count += 1

    print(f"Errors encountered: {error_count}")
    return proc


def flatten_aggregated_data(filepath: str | Path) -> list:
    """
    Flatten aggregated metadata into a list of training instances.

    This function reads an aggregated JSON file of metadata specified by the filepath.
    The file should contain a single JSON object where each key is a metadata field
    (e.g., "description") and its value is a list of corresponding metadata values.
    The function transforms this object into a flat list of dictionaries where each
    dictionary represents a training instance with two keys:
      - "text": a non-empty, stripped metadata value.
      - "label": the metadata field associated with the value.

    Args:
        filepath (str or Path): The path to the aggregated data JSON file.

    Returns:
        list: A list of dictionaries each with keys "text" and "label".

    Raises:
        Exception: If the file cannot be parsed due to JSON decoding errors,
                   attribute issues, or type incompatibility.
    """
    if isinstance(filepath, str):
        filepath = Path(filepath)

    try:
        with open(filepath, "r", encoding="utf-8") as f:
            aggregated_data = json.load(f)

        flattened_data = []

        # iterate over each field and its list of values.
        for field, values in aggregated_data.items():
            # for each metadata value in the list, create an individual training instance
            # each entry should be a dict with "label" and "text" keys,
            # where label is the metadata field and text is each corresponding value
            for value in values:
                # this checks if the value is a non-empty string
                if isinstance(value, str) and value.strip():
                    flattened_data.append(
                        {"text": value.strip(), "label": field}
                    )
    except (json.JSONDecodeError, AttributeError, TypeError) as e:
        raise Exception(
            f"failed due to json incompatibility or similar issue: {e} "
            "Check the formatting of your aggregated data file. It should be a single JSON object"
        )

    return flattened_data


def data_integrity_check(data: list, *labels) -> None:
    """
    Quick function to check the training data doesn't have any erroneous labels

    Args:
        data (list): List of dictionaries containing the training data.

        *labels: Labels to check against.
    """
    for i, dict in enumerate(data):
        if "text" not in dict.keys() or "label" not in dict.keys():
            print(f"Error #1 in entry {i}: {dict}")
            continue
        if not isinstance(dict["text"], str) or not isinstance(
            dict["label"], str
        ):
            print(f"Error #2 in entry {i}: {dict}")
            continue
        if not dict["text"].strip() or not dict["label"].strip():
            print(f"Error #3 in entry {i}: {dict}")
            continue
        if dict["label"] not in labels:
            print(f"Error #4 in entry {i}: {dict}")
            continue
        print(f"#{i} is valid")

In [3]:
import pandas as pd
from pathlib import Path
import json

pt = Path.cwd().parent / Path("lang_codes.xlsx")

langs = pd.read_excel(pt, usecols=[0, 1])

In [4]:
dta = "../aggregate_data_new.json"

with open(dta, "r", encoding="utf-8") as f:
    dtb = json.load(f)

In [None]:

# harvesting operation
# this will call the harvesting function and ask for parameters, or will use the defaults

(*args,) = (DEST_SMALL_ALT, ENDPOINT_COLLECTION["UTOR"], "oai_dc", 2000)

d = args[0]
harvester(*args)


In [None]:

# aggregation operation
# this will take the destination input from the harvesting operation above, saved
# as d, and use it as the path to the directory containing the harvested data
# the data will be aggregated into one long document, 
if not d:
    raise Exception("Need a destination for aggregation")
data_path = d

recs = records_aggregator(d)
with open(f"{d}.json", "w") as f:
    json.dump(recs, f, indent=2, ensure_ascii=False)


In [None]:
# alternate aggregator for more contextualized training data
aggregated_record = "aggregate_data_new.json"

with open("raw_records.json") as f:
    records = json.load(f)

examples = []
for rec in records:
    for field, val in rec.items():
        if not val:
            continue
        snippet  = val if isinstance(val, str) else " ".join(val)
        # build a “context” string of all the *other* fields
        context = " ".join(f"{k}: {v}" for k,v in rec.items() if k != field)
        examples.append({
          "text":    snippet,
          "context": context,
          "label":   field
        })

In [None]:
data_path = "./aggregate_data_new.json"
# flatten operation
try:
    flat_data = flatten_aggregated_data(data_path)
    with open("./flattened_data_bhl_set.json", "w") as f:
        json.dump(flat_data, f, indent=2, ensure_ascii=False)
except Exception as e:
    raise (f"failed to flatten the aggregated data with the following exception: {e}")
# integrity check operation
print("Goodbye")


