In [1]:
import os
import json
from pathlib import Path

# Set paths
BASE_DIR = Path("..") / "data" / "raw_data"
OUTPUT_DIR = Path("..") / "data" / "processed_data"
VOCAB_DIR = Path("..") / "vocabularies"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Load the input text data
with open(BASE_DIR / "env_data.txt", encoding="utf-8") as f:
    preprocessed_texts = [line.strip().lower() for line in f if line.strip()]


In [10]:
import ahocorasick

def build_automaton(vocab_terms):
    A = ahocorasick.Automaton()
    for term in vocab_terms:
        A.add_word(term, term)
    A.make_automaton()
    return A

def annotate_text_with_vocab(text, automaton, label):
    text_length = len(text)
    matches = []

    # Iterate using the automaton
    for end_index, term in automaton.iter(text):
        start_index = end_index - len(term) + 1

        # Whole word check
        if (start_index == 0 or not text[start_index - 1].isalnum()) and (
            end_index + 1 == text_length or not text[end_index + 1].isalnum()
        ):
            matches.append([start_index, end_index + 1, label])

    # Sort by start index, then longer spans first
    matches.sort(key=lambda x: (x[0], x[1] - x[0]), reverse=False)
    
    # Don't block overlaps — just collect all clean, whole-word matches
    annotations = [[start, end, label] for start, end, label in matches]

    return annotations


In [22]:
import inflect
from pathlib import Path

p = inflect.engine()

# Load vocab file
VOCAB_DIR = Path("..") / "vocabularies"
input_path = VOCAB_DIR / "pollutant.txt"
output_path = VOCAB_DIR / "pollutant_with_plurals.txt"

# Read and pluralise
with open(input_path, "r", encoding="utf-8") as f:
    original_terms = {line.strip().lower() for line in f if line.strip()}

expanded_terms = set()

for term in original_terms:
    expanded_terms.add(term)
    # Try to pluralise single words
    if len(term.split()) == 1:
        plural = p.plural(term)
        if plural and plural != term:
            expanded_terms.add(plural)

# Write expanded vocab
with open(output_path, "w", encoding="utf-8") as f:
    for term in sorted(expanded_terms):
        f.write(term + "\n")

print(f"✅ Expanded {len(original_terms)} terms to {len(expanded_terms)} (with plurals added).")
print(f"📁 Saved to: {output_path}")


✅ Expanded 196 terms to 312 (with plurals added).
📁 Saved to: ..\vocabularies\meas_with_plurals.txt


In [6]:
theme_files = [
    "measurement.txt", 
    "pollutant.txt", 
    "env_process.txt", 
    "habitat.txt", 
    "taxonomy.txt"
]


In [11]:
def resolve_overlaps(entities):
    entities = sorted(entities, key=lambda x: (x[0], -(x[1] - x[0])))
    resolved = []
    occupied = set()
    for start, end, label in entities:
        if not any(pos in occupied for pos in range(start, end)):
            resolved.append([start, end, label])
            occupied.update(range(start, end))
    return sorted(resolved, key=lambda x: x[0])


In [12]:
from collections import defaultdict
import time

start_time = time.time()

text_to_annotations = defaultdict(list)

for fname in theme_files:
    theme_name = fname.replace(".txt", "")
    label = theme_name.upper()
    print(f"Annotating category: {label} from {fname}")

    with open(VOCAB_DIR / fname, encoding="utf-8") as f:
        vocab_terms = [line.strip().lower() for line in f if line.strip()]

    automaton = build_automaton(vocab_terms)

    for i, text in enumerate(preprocessed_texts):
        annotations = annotate_text_with_vocab(text, automaton, label)
        if annotations:
            text_to_annotations[text].extend(annotations)
            text_to_annotations[text] = resolve_overlaps(text_to_annotations[text])
        if (i + 1) % 100000 == 0:
            print(f"Processed {i + 1:,}/{len(preprocessed_texts):,} texts...")

# Save final output
annotated_path = OUTPUT_DIR / "training_data.jsonl"
with open(annotated_path, "w", encoding="utf-8") as f:
    for text, annotations in text_to_annotations.items():
        json.dump({"text": text, "label": annotations}, f, ensure_ascii=False)
        f.write("\n")

end_time = time.time()
print(f"✅ Done! Annotated {len(text_to_annotations):,} unique texts.")
print(f"📁 Saved to: {annotated_path}")
print(f"⏱ Total time: {end_time - start_time:.2f} seconds")


Annotating category: MEASUREMENT from measurement.txt
Processed 100,000/564,547 texts...
Processed 200,000/564,547 texts...
Processed 300,000/564,547 texts...
Processed 400,000/564,547 texts...
Processed 500,000/564,547 texts...
Annotating category: POLLUTANT from pollutant.txt
Processed 100,000/564,547 texts...
Processed 200,000/564,547 texts...
Processed 300,000/564,547 texts...
Processed 400,000/564,547 texts...
Processed 500,000/564,547 texts...
Annotating category: ENV_PROCESS from env_process.txt
Processed 100,000/564,547 texts...
Processed 200,000/564,547 texts...
Processed 300,000/564,547 texts...
Processed 400,000/564,547 texts...
Processed 500,000/564,547 texts...
Annotating category: HABITAT from habitat.txt
Processed 100,000/564,547 texts...
Processed 200,000/564,547 texts...
Processed 300,000/564,547 texts...
Processed 400,000/564,547 texts...
Processed 500,000/564,547 texts...
Annotating category: TAXONOMY from taxonomy.txt
Processed 100,000/564,547 texts...
Processed 200

In [13]:
import random
import json
from pathlib import Path

INPUT_FILE = Path("..") / "data" / "processed_data" / "training_data.jsonl"
OUTPUT_FILE = Path("..") / "data" / "processed_data" / "sample_for_manual_testing.jsonl"

all_data = []

# Load safely, skip blank or bad lines
with open(INPUT_FILE, "r", encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        try:
            data = json.loads(line)
            all_data.append(data)
        except json.JSONDecodeError:
            print("⚠️ Skipped malformed line")

# Sample
sampled = random.sample(all_data, min(1000, len(all_data)))

# Save sample
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
    for item in sampled:
        json.dump(item, f, ensure_ascii=False)
        f.write("\n")

print(f"✅ Sampled {len(sampled)} items to {OUTPUT_FILE}")


✅ Sampled 1000 items to ..\data\processed_data\sample_for_manual_testing.jsonl


In [None]:
import os
import json
import spacy
from pathlib import Path
from spacy.training.example import Example

# --- Directory Setup ---
BASE_DIR = Path("data") / "raw_data"
OUTPUT_DIR = Path("data") / "processed_data"
annotated_path = OUTPUT_DIR / "training_data.jsonl"
cleaned_path = OUTPUT_DIR / "cleaned_training_data.jsonl"

# --- Helper Functions ---
def has_overlapping_entities(entities):
    sorted_entities = sorted(entities, key=lambda x: x[0])
    for i in range(len(sorted_entities) - 1):
        current_start, current_end, _ = sorted_entities[i]
        next_start, _, _ = sorted_entities[i + 1]
        if current_end > next_start:
            return True
    return False

def resolve_overlaps(entities):
    entities = sorted(entities, key=lambda x: (x[0], -(x[1] - x[0])))
    resolved = []
    occupied = set()
    for start, end, label in entities:
        if not any(pos in occupied for pos in range(start, end)):
            resolved.append([start, end, label])
            occupied.update(range(start, end))
    return sorted(resolved, key=lambda x: x[0])

# --- SpaCy Setup ---
nlp = spacy.blank("en")
nlp.max_length = 5_000_000

# --- Load Annotated Data ---
with open(annotated_path, "r", encoding="utf-8") as f:
    raw_data = [json.loads(line.strip()) for line in f]

valid_data = []
invalid_data = []

for i, example in enumerate(raw_data):
    text = example["text"]
    annotations = example["label"]

    if has_overlapping_entities(annotations):
        annotations = resolve_overlaps(annotations)

    doc = nlp(text)
    try:
        Example.from_dict(doc, {"entities": annotations})
        valid_data.append({"text": text, "label": annotations})
    except Exception as e:
        invalid_data.append({
            "index": i,
            "error": str(e),
            "text": text,
            "label": annotations
        })

# --- Save Cleaned Data ---
with open(cleaned_path, "w", encoding="utf-8") as f:
    for item in valid_data:
        json.dump(item, f, ensure_ascii=False)
        f.write("\n")

print(f"Cleaned {len(valid_data)} valid samples.")
print(f"Skipped {len(invalid_data)} invalid samples.")
print(f"Saved cleaned annotations to: {cleaned_path}")


In [13]:
import spacy
from pathlib import Path

# Paths
BASE_DIR = Path("..") / "data" / "raw_data"
OUTPUT_DIR = Path("..") / "data" / "processed_data"
VOCAB_DIR = Path("..") / "vocabularies"

input_path = VOCAB_DIR / "taxonomy.txt"
output_path = VOCAB_DIR / "taxonomy_lemmatized.txt"

# Load SpaCy English model
nlp = spacy.load("en_core_web_sm")

# Read original taxonomy terms
with open(input_path, "r", encoding="utf-8") as f:
    terms = [line.strip().lower() for line in f if line.strip()]

lemmatised_terms = set()

for term in terms:
    doc = nlp(term)
    lemma = " ".join([token.lemma_ for token in doc])
    lemmatised_terms.add(lemma)

# Sort and save
lemmatised_sorted = sorted(lemmatised_terms)

with open(output_path, "w", encoding="utf-8") as f:
    for term in lemmatised_sorted:
        f.write(term + "\n")

print(f"Lemmatized {len(terms)} terms down to {len(lemmatised_terms)} unique ones.")
print(f"Saved to: {output_path}")


Lemmatized 4961 terms down to 4554 unique ones.
Saved to: ..\vocabularies\taxonomy_lemmatized.txt


In [2]:
from pathlib import Path

# File paths
VOCAB_DIR = Path("..") / "vocabularies"
input_path = VOCAB_DIR / "taxonomy.txt"
single_output = VOCAB_DIR / "taxonomy_single_word.txt"
multi_output = VOCAB_DIR / "taxonomy_multi_word.txt"

# Read taxonomy terms
with open(input_path, encoding="utf-8") as f:
    terms = [line.strip() for line in f if line.strip()]

# Separate by word count
single_word_terms = [term for term in terms if len(term.split()) == 1]
multi_word_terms = [term for term in terms if len(term.split()) > 1]

# Save them
with open(single_output, "w", encoding="utf-8") as f:
    for term in sorted(set(single_word_terms)):
        f.write(term + "\n")

with open(multi_output, "w", encoding="utf-8") as f:
    for term in sorted(set(multi_word_terms)):
        f.write(term + "\n")

print(f"✅ Split complete. {len(single_word_terms)} single-word terms and {len(multi_word_terms)} multi-word terms saved.")


✅ Split complete. 4963 single-word terms and 66979 multi-word terms saved.
