# Script to


In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from llm_synthesis.utils.style_utils import get_cmap, get_palette, set_style

cmap = get_cmap()
palette = get_palette()
set_style()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

df = pd.read_csv(
    "/Users/magdalenalederbauer/Code/lematerial-llm-synthesis/examples/notebooks/dummy_data.csv"
)
df.columns

# Find the 5 most common synthesis methods
synthesis_counts = df["synthesis_type"].value_counts()
top_5_synthesis = synthesis_counts.head(5).index.tolist()

# Filter data for top 5 synthesis methods
df_filtered = df[df["synthesis_type"].isin(top_5_synthesis)]

In [None]:
# Calculate average scores for each synthesis method
score_columns = [
    "material_extraction_score_llm",
    "process_steps_score_llm",
    "conditions_extraction_score_llm",
]
averages = df_filtered.groupby("synthesis_type")[score_columns].mean()


# Function to plot bar chart for a specific score
def plot_bar(df, score_column):
    # Find the 5 most common synthesis methods
    synthesis_counts = df["synthesis_type"].value_counts()
    top_5_synthesis = synthesis_counts.head(5).index.tolist()

    # Filter data for top 5 synthesis methods
    df_filtered = df[df["synthesis_type"].isin(top_5_synthesis)]

    # Calculate average scores for each synthesis method
    averages = (
        df_filtered.groupby("synthesis_type")[score_column].mean().sort_index()
    )

    # Color mapping for different scores
    color_map = {
        "material_extraction_score_llm": palette[0],
        "process_steps_score_llm": palette[1],
        "conditions_extraction_score_llm": palette[2],
        "material_extraction_score_human": palette[3],
        "process_steps_score_human": palette[4],
        "conditions_extraction_score_human": palette[5],
    }

    # Create the plot
    plt.figure(figsize=(3, 3))
    bars = plt.bar(
        averages.index,
        averages.values,
        color=color_map[score_column],
        linewidth=0.5,
    )

    # Format the title
    title = score_column.replace("_", " ").title()
    plt.xlabel("Synthesis Method", fontsize=12)
    plt.ylabel(title, fontsize=12)
    plt.ylim(0, 5)
    plt.grid(axis="y", alpha=0.3)

    # # Add value labels on top of bars
    for bar in bars:
        height = bar.get_height()
        height = round(height, 2)

    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()


# Create the 3 separate plots
plot_bar(df, "material_extraction_score_llm")
plot_bar(df, "process_steps_score_llm")
plot_bar(df, "conditions_extraction_score_llm")

In [None]:
def plot_bar_comparison(df, llm_column, human_column, llm_color, human_color):
    # Find the 5 most common synthesis methods
    synthesis_counts = df["synthesis_type"].value_counts()
    top_5_synthesis = synthesis_counts.head(5).index.tolist()

    # Filter data for top 5 synthesis methods
    df_filtered = df[df["synthesis_type"].isin(top_5_synthesis)]

    # Calculate average scores for each synthesis method
    llm_averages = (
        df_filtered.groupby("synthesis_type")[llm_column].mean().sort_index()
    )
    human_averages = (
        df_filtered.groupby("synthesis_type")[human_column].mean().sort_index()
    )

    # Set up the plot
    plt.figure(figsize=(3, 3))

    # Set the width of bars and positions
    bar_width = 0.4
    x_pos = np.arange(len(llm_averages))

    # Create bars
    bars1 = plt.bar(
        x_pos - bar_width / 2,
        llm_averages.values,
        bar_width,
        label="LLM",
        color=llm_color,
        linewidth=0.5,
    )
    bars2 = plt.bar(
        x_pos + bar_width / 2,
        human_averages.values,
        bar_width,
        label="Human",
        color=human_color,
        linewidth=0.5,
    )

    # Format the plot
    base_title = llm_column.replace("_llm", "").replace("_", " ").title()
    plt.xlabel("Synthesis Method", fontsize=12)
    plt.ylabel(base_title, fontsize=12)
    plt.ylim(0, 5)
    plt.grid(axis="y", alpha=0.3)

    # Set x-axis labels
    plt.xticks(x_pos, llm_averages.index, rotation=45)

    # Add value labels on top of bars
    for bar in bars1:
        height = bar.get_height()
        height = round(height, 2)
    for bar in bars2:
        height = bar.get_height()
        height = round(height, 2)

    # Add legend
    plt.legend(loc="upper right", fontsize=11)
    plt.tight_layout()
    plt.show()


# Material extraction comparison
plot_bar_comparison(
    df,
    llm_column="material_extraction_score_llm",
    human_column="material_extraction_score_human",
    llm_color=palette[0],
    human_color=palette[1],
)

# # Process steps comparison
plot_bar_comparison(
    df,
    llm_column="process_steps_score_llm",
    human_column="process_steps_score_human",
    llm_color=palette[2],
    human_color=palette[3],
)

# # Conditions extraction comparison
plot_bar_comparison(
    df,
    llm_column="conditions_extraction_score_llm",
    human_column="conditions_extraction_score_human",
    llm_color=palette[4],
    human_color=palette[5],
)

# # Equipment extraction comparison
plot_bar_comparison(
    df,
    llm_column="equipment_extraction_score_llm",
    human_column="equipment_extraction_score_human",
    llm_color=palette[2],
    human_color=palette[5],
)

# # Semantic accuracy comparison
plot_bar_comparison(
    df,
    llm_column="semantic_accuracy_score_llm",
    human_column="semantic_accuracy_score_human",
    llm_color=palette[2],
    human_color=palette[5],
)

# # Format compliance comparison
plot_bar_comparison(
    df,
    llm_column="format_compliance_score_llm",
    human_column="format_compliance_score_human",
    llm_color=palette[0],
    human_color=palette[2],
)

# # # Overall score comparison
# plot_bar_comparison(
#     df,
#     llm_column="overall_score_llm",
#     human_column="overall_score_human",
#     llm_color=palette[0],
#     human_color=palette[2],
# )

In [None]:
import json
import os
from collections import Counter
from typing import Any

import pandas as pd

from llm_synthesis.utils.style_utils import get_cmap, set_style


def load_result_json(file_path: str) -> list[dict[str, Any]]:
    """Load and parse result.json file."""
    try:
        with open(file_path, encoding="utf-8") as f:
            data = json.load(f)
        return data
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
        return []


def extract_synthesis_data(synthesis: dict[str, Any]) -> dict[str, Any]:
    """Extract evaluation scores and metadata."""
    data = {
        "scores": {},
        "synthesis_method": None,
        "target_compound_type": None,
        "material": None,
    }
    if "evaluation" in synthesis and "scores" in synthesis["evaluation"]:
        for key, value in synthesis["evaluation"]["scores"].items():
            if key.endswith("_score"):
                data["scores"][key] = float(value)
    if "synthesis" in synthesis:
        data["synthesis_method"] = synthesis["synthesis"].get(
            "synthesis_method"
        )
        data["target_compound_type"] = synthesis["synthesis"].get(
            "target_compound_type"
        )
    data["material"] = synthesis.get("material")
    return data


def process_subdirectory(subdir_path: str) -> dict[str, Any]:
    """Process a single subdirectory."""
    result_file = os.path.join(subdir_path, "result.json")
    syntheses = load_result_json(result_file)
    if not syntheses:
        return {}

    all_scores, methods, types, materials = [], [], [], []
    for s in syntheses:
        data = extract_synthesis_data(s)
        if data["evaluation"] is None:
            continue
        if data["scores"]:
            all_scores.append(data["scores"])
            methods.append(data["synthesis_method"])
            types.append(data["target_compound_type"])
            materials.append(data["material"])

    if not all_scores:
        return {}

    all_categories = {k for scores in all_scores for k in scores.keys()}
    avg_scores = {
        cat: np.nanmean([s.get(cat, np.nan) for s in all_scores])
        for cat in all_categories
    }

    return {
        "subdir_name": os.path.basename(subdir_path),
        "scores": avg_scores,
        "synthesis_count": len(all_scores),
        "synthesis_methods": methods,
        "target_compound_types": types,
        "materials": materials,
    }


def analyze_metadata(results: list[dict[str, Any]]) -> dict[str, Any]:
    """Analyze metadata across all results."""
    all_methods = [
        m for r in results if r for m in r.get("synthesis_methods", []) if m
    ]
    all_types = [
        t for r in results if r for t in r.get("target_compound_types", []) if t
    ]
    return {
        "synthesis_methods": dict(Counter(all_methods)),
        "target_compound_types": dict(Counter(all_types)),
    }


def calculate_scores_by_category(
    my_dir: str, results: list[dict[str, Any]], category_field: str
) -> dict[str, dict[str, float]]:
    """Calculate average scores grouped by a specific category."""
    category_data = {}
    for result in filter(None, results):
        subdir_path = os.path.join(my_dir, result["subdir_name"])
        syntheses = load_result_json(os.path.join(subdir_path, "result.json"))
        for s in syntheses:
            data = extract_synthesis_data(s)
            category = data.get(category_field)
            if category and data["scores"]:
                category_data.setdefault(category, []).append(data["scores"])

    category_averages = {}
    for category, score_lists in category_data.items():
        all_score_types = {k for scores in score_lists for k in scores.keys()}
        avg_scores = {
            st: np.nanmean([s.get(st, np.nan) for s in score_lists])
            for st in all_score_types
        }
        category_averages[category] = {
            k: v for k, v in avg_scores.items() if not np.isnan(v)
        }
    return category_averages


def plot_scores_by_category(
    scores_by_category: dict[str, dict[str, float]],
    category_name: str,
    counts: dict[str, int],
    cmap,
):
    """Generate and save bar plots for each score type."""
    df = pd.DataFrame.from_dict(scores_by_category, orient="index")
    if not counts:
        return

    norm = plt.Normalize(
        vmin=min(counts.values()) or 0, vmax=max(counts.values()) or 1
    )

    for score_column in df.columns:
        fig, ax = plt.subplots()
        sorted_scores = df[score_column].sort_values(ascending=False)
        bar_colors = [
            cmap(norm(counts.get(cat, 0))) for cat in sorted_scores.index
        ]

        sorted_scores.plot(kind="bar", ax=ax, color=bar_colors)

        ax.set_title(
            f"Average {score_column.replace('_', ' ').title()} by {category_name.replace('_', ' ').title()}"
        )
        ax.set_xlabel(category_name.replace("_", " ").title())
        ax.set_ylabel("Average Score")
        ax.set_ylim(0, 5)
        plt.xticks(rotation=45, ha="right")

        sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
        sm.set_array([])
        cbar = fig.colorbar(sm, ax=ax)
        cbar.set_label("Number of Entries")

        plt.tight_layout()
        # plt.savefig(f"{category_name}_{score_column}_plot_colored.png")
        # plt.close(fig)


def plot_average_of_other_scores(
    scores_by_category: dict[str, dict[str, float]],
    category_name: str,
    counts: dict[str, int],
    cmap,
):
    """Plot the average of all scores, excluding 'overall_score'."""
    df = pd.DataFrame.from_dict(scores_by_category, orient="index")
    score_columns = [col for col in df.columns if col != "overall_score"]
    if not score_columns or not counts:
        return

    df["average_of_other_scores"] = df[score_columns].mean(axis=1)
    norm = plt.Normalize(
        vmin=min(counts.values()) or 0, vmax=max(counts.values()) or 1
    )

    fig, ax = plt.subplots()
    sorted_scores = df["average_of_other_scores"].sort_values(ascending=False)
    bar_colors = [cmap(norm(counts.get(cat, 0))) for cat in sorted_scores.index]

    sorted_scores.plot(kind="bar", ax=ax, color=bar_colors)

    ax.set_title(
        f"Average of Other Scores by {category_name.replace('_', ' ').title()}"
    )
    ax.set_xlabel(category_name.replace("_", " ").title())
    ax.set_ylabel("Average Score")
    ax.set_ylim(0, 5)
    plt.xticks(rotation=45, ha="right")

    sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
    sm.set_array([])
    cbar = fig.colorbar(sm, ax=ax)
    cbar.set_label("Number of Entries")

    plt.tight_layout()

In [None]:
output_file: str = "synthesis_evaluation_results.csv"
MY_DIR = "/Users/magdalenalederbauer/Code/lematerial-llm-synthesis/annotations/"
my_dir = MY_DIR


subdirs = [
    d for d in os.listdir(my_dir) if os.path.isdir(os.path.join(my_dir, d))
]

# results = [process_subdirectory(os.path.join(my_dir, s)) for s in subdirs]
results = []
for s in subdirs:
    try:
        results.append(process_subdirectory(os.path.join(my_dir, s)))
    except Exception as e:
        print(s)
        print(e)

metadata = analyze_metadata(results)

method_scores = calculate_scores_by_category(
    my_dir, results, "synthesis_method"
)
if method_scores:
    plot_scores_by_category(
        method_scores,
        "synthesis_method",
        metadata["synthesis_methods"],
        cmap,
    )
    plot_average_of_other_scores(
        method_scores,
        "synthesis_method",
        metadata["synthesis_methods"],
        cmap,
    )


type_scores = calculate_scores_by_category(
    my_dir, results, "target_compound_type"
)
if type_scores:
    plot_scores_by_category(
        type_scores,
        "target_compound_type",
        metadata["target_compound_types"],
        cmap,
    )
    plot_average_of_other_scores(
        type_scores,
        "target_compound_type",
        metadata["target_compound_types"],
        cmap,
    )

print("Plot generation complete.")