### Add original_id

In [27]:
import json

# Input/output file paths
jsonl_file = "../../11_annotations/gt_comparison/gt_100_comparison.jsonl"
json_file = "../../11_annotations/gt_comparison/gt_100_comparison.json"
output_file = "../../11_annotations/gt_comparison/out.jsonl"

# Load the mapping JSON
with open(json_file, "r", encoding="utf-8") as f:
    mapping_data = json.load(f)

# Build a dictionary: text -> original_id (forced as string)
text_to_id = {}

if isinstance(mapping_data, dict):
    for number, content in mapping_data.items():
        data = content.get("data", {})
        text = data.get("text")
        original_id = data.get("original_id")
        if text and original_id is not None:
            text_to_id[text] = str(original_id).strip()
elif isinstance(mapping_data, list):
    for content in mapping_data:
        data = content.get("data", {})
        text = data.get("text")
        original_id = data.get("original_id")
        if text and original_id is not None:
            text_to_id[text] = str(original_id).strip()

# Process JSONL and enrich
with open(jsonl_file, "r", encoding="utf-8") as infile, open(output_file, "w", encoding="utf-8") as outfile:
    for line in infile:
        entry = json.loads(line)
        text = entry.get("text")
        orig_id = text_to_id.get(text, None)

        new_entry = {}
        keys = list(entry.keys())
        if keys:
            # always convert first key to string if it is "id"
            first_key = keys[0]
            if first_key == "id":
                new_entry[first_key] = str(entry[first_key])
            else:
                new_entry[first_key] = entry[first_key]

            # add original_id as string
            new_entry["original_id"] = str(orig_id) if orig_id is not None else None

            for k in keys[1:]:
                # skip if we already added original_id
                if k == "original_id":
                    continue
                new_entry[k] = entry[k]
        else:
            new_entry["original_id"] = str(orig_id) if orig_id is not None else None

        outfile.write(json.dumps(new_entry, ensure_ascii=False) + "\n")


print(f"✅ Done! Enriched JSONL saved as {output_file}")

✅ Done! Enriched JSONL saved as ../../11_annotations/gt_comparison/out.jsonl


### Remove aspect phrase for acsa

In [None]:
import json

input_path = "../../11_annotations/ground_truth/tasd_testset.jsonl"
output_path = "../../11_annotations/ground_truth/acsa_testset.jsonl"

with open(input_path, "r", encoding="utf-8") as infile, \
     open(output_path, "w", encoding="utf-8") as outfile:
    
    for line in infile:
        record = json.loads(line)
        
        # Truncate labels to max length 2
        new_labels = []
        for label in record["labels"]:
            if len(label) > 2:
                new_labels.append(label[:2])
            else:
                new_labels.append(label)
        
        # Remove duplicates in the same line
        unique_labels = []
        seen = set()
        for label in new_labels:
            tup = tuple(label)  # convert list to tuple to make it hashable
            if tup not in seen:
                seen.add(tup)
                unique_labels.append(label)
        
        record["labels"] = unique_labels
        outfile.write(json.dumps(record, ensure_ascii=False) + "\n")


### Rename categories and check for conflict

In [1]:
import json

# Mapping from German to English
POLARITY_MAP = {
    "Positiv": "positive",
    "Negativ": "negative",
    "Neutral": "neutral",
    "Konflikt": "conflict"
}

def transform_labels(sample):
    """Transform one sample's labels."""
    new_labels = []
    for label in sample.get("labels", []):
        category = label[0].lower()
        polarity = POLARITY_MAP.get(label[1], label[1]).lower()
        rest = label[2:] if len(label) > 2 else []
        new_labels.append([category, polarity] + rest)
    sample["labels"] = new_labels
    return sample

def process_jsonl(input_path, output_path):
    """Load JSONL, transform labels, and save to new JSONL."""
    with open(input_path, "r", encoding="utf-8") as infile, \
         open(output_path, "w", encoding="utf-8") as outfile:
        
        for line in infile:
            if not line.strip():
                continue
            sample = json.loads(line)
            transformed = transform_labels(sample)
            outfile.write(json.dumps(transformed, ensure_ascii=False) + "\n")


input = "../../11_annotations/few_shot/tasd/trainset_llm.jsonl"
output = "../../11_annotations/few_shot/tasd/trainset_llm_low.jsonl"
process_jsonl(input, output)


### Check for mistakes (Duplicates in Labels -> ACSA)

In [None]:
import json

# --- Path to file ---
# input_file = "../../11_annotations/few_shot/acsa/acsa_llm_trainset.jsonl"
# input_file = "../../11_annotations/ground_truth/acsa_testset.jsonl"
input_file = "../../11_annotations/crowd/tasd/trainset_crowd_noconflict.jsonl"

# --- Validation ---
with open(input_file, "r", encoding="utf-8") as f:
    for line in f:
        entry = json.loads(line)
        eid = entry.get("id")
        labels = entry.get("labels", [])

        # --- Check for Konflikt/conflict ---
        if any(label[1].lower() == "konflikt" or label[1].lower() == "conflict" for label in labels):
            print(f"⚠️ Konflikt/Conflict found in id={eid}")

        # --- Check for duplicates ---
        seen = set()
        duplicates = set()
        for label in labels:
            tup = tuple(label)
            if tup in seen:
                duplicates.add(tup)
            else:
                seen.add(tup)

        if duplicates:
            print(f"⚠️ Duplicate labels in id={eid}: {duplicates}")
    print("✅ Validation complete.")


✅ Validation complete.


## Transform csv to jsonl

In [None]:
import csv
import json

# Input CSV and output JSONL file paths
csv_file = "../../11_annotations/text_only/trainset_1000.csv"
jsonl_file = "../../11_annotations/text_only/trainset_1000.jsonl"

# Open CSV and JSONL
with open(csv_file, newline='', encoding='utf-8') as f_csv, open(jsonl_file, 'w', encoding='utf-8') as f_jsonl:
    reader = csv.DictReader(f_csv)  # Reads CSV into dictionaries
    for row in reader:
        # Convert each row to JSON and write as one line
        json.dump(row, f_jsonl, ensure_ascii=False)
        f_jsonl.write("\n")

print(f"CSV '{csv_file}' has been converted to JSONL '{jsonl_file}'")


## Display Aspect Phrases with special Cases

In [8]:
import json
import re

# INPUT_FILE = "../../11_annotations/experts/tasd_experts_trainset_low.jsonl"
INPUT_FILE = "../../11_annotations/ground_truth/tasd_testset_low.jsonl"

# regex: only allow letters a-z, A-Z, öäüÖÄÜ
valid_phrase_re = re.compile(r"^[a-zA-ZöäüÖÄÜß ]+$")

specials = []

with open(INPUT_FILE, "r", encoding="utf-8") as f:
    for line in f:
        if not line.strip():
            continue
        entry = json.loads(line)

        for lbl in entry.get("labels", []):
            if len(lbl) >= 3:
                phrase = lbl[2]
                if phrase != "NULL" and not valid_phrase_re.match(phrase):
                    specials.append({
                        "id": entry["id"],
                        "text": entry["text"],
                        "phrase": phrase
                    })

print(f"Found {len(specials)} special aspect phrases:")
for s in specials[:200]:  # show only first 20 examples
    print(s["phrase"])

if len(specials) > 200:
    print("... more not shown")


Found 43 special aspect phrases:
Bedienung, Susanna
Almhütten-Burger
Preis-Leistungs-Verhältnis
Fischplatte (für 1 Person
Preis- Leistung
Döner & Co angesiedelten Küche
Varieté Show
Mitarbeiter/innen
Preis-Leistungsverhältnis
Mezzelune ( schwarz
Araz (Kellner
Etablissement „Augustiner
Tapas-Auswahl
Gruppen-Events
RESTAURANT_NAME
Hauptgang, Rinderfilet
Soßen-Auswahl
Inhaber / Geschäftsführer
RESTAURANT_NAME
Nachtisch (hausgemachte LOC rote Grütze mit Vanillesauce (winziger Kleks
5 halbe, zu harte Pflaumen eine Kugel Movenpick Walnusseis und etwas Sprühsahne
Bedienung (Yassin
Preis-/Leistungsverhältnis
Fusili mit salciche3
Fisch & Co
Café
RESTAURANT_NAME
Norma- mit Tomatensauce und Auberginen
Location/Ambiente
RESTAURANT_NAME
Speise-Erlebnis
Bedienung, Lena
RESTAURANT_NAME
Hauptspeise "Calamares a la plancha
Preis-Leistungsverhältnis
Curry-Garnelen
Preis-Leistungsverhältnis
Preis/Leistungsverhältnis
LOCes Gose (Bier
Sushi-Rollen
RESTAURANT_NAME-Team
Preis-Leistungsverhältnis
Preis/Leistu

### Transform output to jsonl (Gemma 27B FS -> jsonl)

In [None]:
import os
import json

# Input and output base directories
base_dir = 'acsa_experts_llm_fs'
output_dir = 'acsa_experts_llm_fs_clean'

# Walk through all files in directory
for root, _, files in os.walk(base_dir):
    for file in files:
        if file.endswith('.json'):
            file_path = os.path.join(root, file)
            print(f"Processing file: {file_path}")

            # Load JSON data
            with open(file_path, 'r', encoding='utf-8') as f:
                try:
                    data = json.load(f)
                except json.JSONDecodeError as e:
                    print(f"Failed to load {file_path}: {e}")
                    continue

            # Remove all keys except selected ones
            cleaned_data = []
            for obj in data:
                cleaned_obj = {
                    'id': obj.get('id'),
                    'original_id': obj.get('original_id'),
                    'text': obj.get('text'),
                    'pred_label': obj.get('pred_label')
                }
                cleaned_data.append(cleaned_obj)

            # Construct output path, preserving subfolder structure
            rel_path = os.path.relpath(root, base_dir)  # relative to base_dir
            target_dir = os.path.join(output_dir, rel_path)
            os.makedirs(target_dir, exist_ok=True)

            out_file_path = os.path.join(target_dir, file)

            # Save cleaned JSON
            with open(out_file_path, 'w', encoding='utf-8') as f:
                json.dump(cleaned_data, f, ensure_ascii=False, indent=2)

            # Also save as JSONL with "labels" format
            out_file_path_jsonl = os.path.splitext(out_file_path)[0] + ".jsonl"
            with open(out_file_path_jsonl, 'w', encoding='utf-8') as f_jsonl:
                for obj in cleaned_data:
                    jsonl_entry = {
                        "id": obj["id"],
                        "original_id": obj["original_id"],
                        "text": obj["text"],
                        # wrap pred_label into [["category", "polarity", "term"]]
                        "labels": obj.get("pred_label")
       
                    }
                    f_jsonl.write(json.dumps(jsonl_entry, ensure_ascii=False) + "\n")

print("Processing complete. Cleaned files are in 'results_clean/'")


Processing file: z_in\crowd_acsa_raw.json
Processing complete. Cleaned files are in 'results_clean/'


## Check for duplicates in ACSA Dataset

In [5]:
import json
import os
seed = "25"
input_file = f"acsa_crowd_llm_ft/acsa_crowd-acsa_test_orig-o_0.0002_16_5_{seed}.jsonl"  # original file
output_folder = "acsa_crowd_llm_ft_clean/"
os.makedirs(output_folder, exist_ok=True)
output_file = os.path.join(output_folder, f"acsa_crowd-acsa_test_orig-o_0.0002_16_5_{seed}.jsonl")

with open(input_file, "r", encoding="utf-8") as f_in, open(output_file, "w", encoding="utf-8") as f_out:
    for line in f_in:
        entry = json.loads(line)
        labels = entry.get("labels", [])
        
        # Remove duplicates by converting list of lists to set of tuples, then back to list
        unique_labels = [list(t) for t in set(tuple(lbl) for lbl in labels)]
        entry["labels"] = unique_labels
        
        # Save cleaned entry
        json.dump(entry, f_out, ensure_ascii=False)
        f_out.write("\n")

print(f"Cleaned file saved to: {output_file}")


Cleaned file saved to: acsa_crowd_llm_ft_clean/acsa_crowd-acsa_test_orig-o_0.0002_16_5_25.jsonl


In [16]:
import json

file_path = "acsa_crowd_llm_ft/acsa_crowd-acsa_test_orig-o_0.0002_16_5_25.jsonl"  # replace with your JSONL file
with open(file_path, "r", encoding="utf-8") as f:
    for line in f:
        entry = json.loads(line)
        labels = entry.get("labels", [])
        id = entry.get("id")
        
        # Check for duplicates by converting list of lists to set of tuples
        unique_labels = set(tuple(lbl) for lbl in labels)
        if len(unique_labels) < len(labels):
            print(f"Duplicate labels found in id: {entry.get('id')}")
    print("Check complete.")


Duplicate labels found in id: 648
Check complete.


### Convert CSV to JSON

In [None]:
import csv
import json

# Parameters
input_file = "datasets/trainset_1000.csv"      # your input Excel file
output_file = "datasets/trainset_1000.jsonl"    # output JSONL file

# Read Excel
with open(input_file, "r", encoding="utf-8") as csv_file, \
     open(output_file, "w", encoding="utf-8") as jsonl_file:
    
    reader = csv.DictReader(csv_file)
    for row in reader:
        jsonl_file.write(json.dumps(row, ensure_ascii=False) + "\n")

print(f"Converted CSV {input_file} to JSONL {output_file}")


### Extract 30 examples from the 50 examples

In [None]:
import json
import random

# Paths
input_file = "datasets/50_examples.jsonl"      # your input file
output_file = "datasets/30_examples.jsonl"   # file to save selected examples
sample_size = 30
seed = 14

random.seed(seed)
# Load JSON
with open(input_file, "r", encoding="utf-8") as f:
    data = [json.loads(line) for line in f]

# Check sample size
if sample_size > len(data):
    raise ValueError(f"Sample size {sample_size} is greater than dataset size {len(data)}.")

# Random sample
sampled_data = random.sample(data, sample_size)

# Save back to JSONL
with open(output_file, "w", encoding="utf-8") as f:
    for item in sampled_data:
        f.write(json.dumps(item, ensure_ascii=False) + "\n")

print(f"Saved {sample_size} random examples to {output_file}")


# Create the original Testset (GERestaurant) into the new format

In [12]:
import json

original_file = "../../11_annotations/text_only/trainset_1000.jsonl"  # File A
file_b = "../../04_data/datasets/gerestaurant/train.json"          # File B
new_file = "../../11_annotations/text_only/updatet_labels_original.jsonl"       # Output file
# label mapping
label_map = {
    "price": "preis",
    "ambience": "ambiente",
    "service": "service",
    "general impression": "gesamteindruck",
    "food": "essen"
}

# read File A (jsonl)
file_a_records = []
with open(original_file, "r", encoding="utf-8") as fa:
    for line in fa:
        file_a_records.append(json.loads(line))

# read File B (json or list of jsons)
file_b_data = []
with open(file_b, "r", encoding="utf-8") as fb:
    for line in fb:
        file_b_data.append(json.loads(line))

# create lookup for File B by id
file_b_lookup = {str(entry["id"]): entry for entry in file_b_data}

# merge
merged_records = []
for rec in file_a_records:
    orig_id = rec.get("original_id")
    b_entry = file_b_lookup.get(str(orig_id))
    if b_entry:
        if rec["text"] == b_entry["text"]:
            mapped_labels = []
            for label in b_entry.get("labels", []):
                mapped_label = label.copy()
                if label[0] in label_map:
                    mapped_label[0] = label_map[label[0]]
                mapped_labels.append(mapped_label)
            rec["labels"] = mapped_labels
        else:
            print(f"⚠️ Text mismatch for original_id {orig_id}")
    merged_records.append(rec)

# write out as jsonl
with open(new_file, "w", encoding="utf-8") as out:
    for rec in merged_records:
        out.write(json.dumps(rec, ensure_ascii=False) + "\n")
print(f"✅ Done! Merged file saved as {new_file}")

✅ Done! Merged file saved as ../../11_annotations/text_only/updatet_labels_original.jsonl


### Compare Original and Expert Trainset

In [17]:
import json

# File paths
file_original = "../../11_annotations/text_only/updatet_labels_original.jsonl"  # File A
file_experts = "../../11_annotations/experts/tasd_experts_trainset_low.jsonl"  # File B

# --- Helper: normalize IDs if they start at 1 ---
def normalize_id(id_str, offset=0):
    """Convert string ID to int and apply offset if needed."""
    return str(int(id_str) - offset)  # subtract offset if needed

# --- Read JSONL into dict keyed by normalized ID ---
def read_jsonl(file_path, offset=0):
    records = {}
    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            rec = json.loads(line)
            records[normalize_id(rec["id"], offset=offset)] = rec
    return records

# Detect ID offset automatically (optional: here we assume file_original is 0-based)
original_records = read_jsonl(file_original, offset=0)
experts_records  = read_jsonl(file_experts,  offset=1)  # experts IDs start at 1

# --- Compare labels ---
same_count = 0
changed_count = 0
added_count = 0
removed_count = 0
org = 0
new = 0
diffs = []

for id_, rec_orig in original_records.items():
    rec_exp = experts_records.get(id_)
    if not rec_exp:
        continue  # ID missing in experts file

    if rec_orig["text"] != rec_exp["text"]:
        print(f"Text mismatch for ID {id_}")
        continue

    labels_orig = set(tuple(l) for l in rec_orig.get("labels", []))
    
    labels_exp  = set(tuple(l) for l in rec_exp.get("labels", []))

    same = labels_orig & labels_exp
    changed = (labels_orig - labels_exp) | (labels_exp - labels_orig)
    s = labels_exp - labels_orig
    # print(labels_exp - labels_orig)
    same_count += len(same)
    added = labels_exp - labels_orig
    removed = labels_orig - labels_exp
    added_count += len(added)
    removed_count += len(removed)
    changed_count += len(added) + len(removed)
    org += len(labels_orig)
    new += len(labels_exp)
    # Detailed per-triplet diff
    for l in labels_orig - labels_exp:
        diffs.append({"id": id_, "text": rec_orig["text"], "change": "removed", "label": l})
    for l in labels_exp - labels_orig:
        diffs.append({"id": id_, "text": rec_orig["text"], "change": "added", "label": l})

# --- Summary output ---
print("=== Label Comparison Summary ===")
print(f"Same triplets:    {same_count}")
print(f"Changed triplets: {changed_count}")
print(f"Added triplets:   {added_count}")
print(f"Removed triplets: {removed_count}")
print(f"Org: {org}")
print(f"New: {new}")




=== Label Comparison Summary ===
Same triplets:    1333
Changed triplets: 194
Added triplets:   92
Removed triplets: 102
Org: 1435
New: 1425


### Transform single GT Annotation from TASD to ACSA

In [7]:
import os
import json

# input and output folder paths
input_folder = "../../11_annotations/ground_truth/Single_Annotations_TASD"
output_folder = "../../11_annotations/ground_truth/Single_Annotations_ACSA"
os.makedirs(output_folder, exist_ok=True)

# polarity mapping
polarity_map = {
    "positiv": "positiv",
    "negativ": "negativ",
    "neutral": "neutral",
    "conflict": "konflikt"
}

# process each file
for filename in os.listdir(input_folder):
    if filename.endswith(".json"):
        input_path = os.path.join(input_folder, filename)
        output_path = os.path.join(output_folder, filename)

        with open(input_path, "r", encoding="utf-8") as infile, \
             open(output_path, "w", encoding="utf-8") as outfile:

            for line in infile:
                if not line.strip():
                    continue
                data = json.loads(line)

                new_labels = []
                for label in data.get("labels", []):
                    if len(label) >= 2:
                        category = label[0].lower()
                        polarity = label[1].lower()
                        polarity = polarity_map.get(polarity, polarity)
                        new_labels.append([category, polarity])

                data["labels"] = new_labels
                outfile.write(json.dumps(data, ensure_ascii=False) + "\n")

## Remove Conflict and Duplicates for ACSA

In [None]:
import os
import json

# input and output folder paths
input_folder = "../../11_annotations/ground_truth/Single_Annotations_ACSA"
output_folder = "../../11_annotations/ground_truth/Single_Annotations_ACSA_processed"
os.makedirs(output_folder, exist_ok=True)

# polarity mapping
polarity_map = {
    "positiv": "positiv",
    "negativ": "negativ",
    "neutral": "neutral",
    "conflict": "konflikt"
}

# process each file
for filename in os.listdir(input_folder):
    if filename.endswith(".json"):  # treat .json as jsonl
        input_path = os.path.join(input_folder, filename)
        output_path = os.path.join(output_folder, filename.replace(".json", ".jsonl"))

        with open(input_path, "r", encoding="utf-8") as infile, \
             open(output_path, "w", encoding="utf-8") as outfile:

            for line in infile:
                if not line.strip():
                    continue
                data = json.loads(line)

                new_labels = []
                for label in data.get("labels", []):
                    if len(label) >= 2:
                        category = label[0].lower()
                        polarity = label[1].lower()
                        polarity = polarity_map.get(polarity, polarity)

                        # skip konflikt
                        if polarity == "konflikt":
                            continue

                        new_labels.append((category, polarity))  # use tuple for deduplication

                # remove duplicates while preserving order
                seen = set()
                unique_labels = []
                for lbl in new_labels:
                    if lbl not in seen:
                        seen.add(lbl)
                        unique_labels.append(list(lbl))  # convert tuple back to list

                data["labels"] = unique_labels
                outfile.write(json.dumps(data, ensure_ascii=False) + "\n")

### Replace ids and transform to string (ids)

In [None]:
import os
import json
import re

# input and output folder paths
input_folder = "../../11_annotations/text_only/updatet_labels_original.jsonl"
output_folder = "../../11_annotations/text_only/updatet_labels_original.jsonl"
os.makedirs(output_folder, exist_ok=True)

# regex to extract start/end ids from filename
pattern = re.compile(r".*_(\d+)_(\d+)_.*")

for filename in os.listdir(input_folder):
    if filename.endswith(".jsonl"):
        match = pattern.match(filename)
        if not match:
            print(f"⚠️ Skipping {filename}, cannot extract id range")
            continue

        start_id, end_id = map(int, match.groups())
        input_path = os.path.join(input_folder, filename)
        output_path = os.path.join(output_folder, filename)

        with open(input_path, "r", encoding="utf-8") as infile:
            lines = infile.readlines()

        if len(lines) != (end_id - start_id + 1):
            print(f"⚠️ Warning: {filename} has {len(lines)} lines but expected {end_id - start_id + 1}")

        with open(output_path, "w", encoding="utf-8") as outfile:
            for idx, line in enumerate(lines, start=start_id):
                data = json.loads(line)

                # assign new string id
                data["id"] = str(idx)

                outfile.write(json.dumps(data, ensure_ascii=False) + "\n")
