In [12]:
from utils import Evaluation, CriteraFeedback

In [7]:
Evaluation.model_fields

{'introduction': FieldInfo(annotation=CriteraFeedback, required=True, description='Analysis of the introduction of the essay with these criteria: 1. Clarity of thesis statement, 2. Engagement and relevance of opening statements'),
 'structure': FieldInfo(annotation=CriteraFeedback, required=True, description="Analasis of the structure of the essay's body with these criteria: 1. Organization and clarity of paragraphs, 2. Logical flow of ideas"),
 'argumentation': FieldInfo(annotation=CriteraFeedback, required=True, description='Analysis of the argumentation of the essay with these criteria: 1. Strength and clarity of arguments, 2. Use of critical reasoning'),
 'evidence': FieldInfo(annotation=CriteraFeedback, required=True, description='Analysis of the evidence used in the essay with these criteria: 1. Relevance and quality of evidence, 2. Use of citations and references'),
 'conclusion': FieldInfo(annotation=CriteraFeedback, required=True, description='Analysis of the conclusion of the

In [10]:
Evaluation.model_fields['introduction'].annotation.model_fields

{'strengths': FieldInfo(annotation=List[str], required=True, description='Concise list of strengths for this criteria'),
 'weaknesses': FieldInfo(annotation=List[str], required=True, description='Concise list of weaknesses for this criteria'),
 'suggestions': FieldInfo(annotation=List[str], required=True, description='Concise list of suggestions provided that are not weaknesses')}

In [37]:
from typing import List, Type, Dict, Any, get_origin, get_args
from pydantic import BaseModel
from collections import defaultdict

def remove_duplicates(items):
    seen = set()
    unique_items = []
    for item in items:
        # Make sure only hashable items (like strings or numbers) are considered for deduplication.
        if isinstance(item, (str, int, float, tuple)):
            if item not in seen:
                seen.add(item)
                unique_items.append(item)
        else:
            # Non-hashable items (like dicts) are added directly without deduplication
            unique_items.append(item)
    return unique_items

def aggregate_dict(aggregated: Dict[str, Any], data: Dict[str, Any]):
    for key, value in data.items():
        if isinstance(value, list):
            if key not in aggregated:
                aggregated[key] = []
            aggregated[key].extend(value)
        elif isinstance(value, dict):
            if key not in aggregated:
                aggregated[key] = defaultdict(list)
            aggregate_dict(aggregated[key], value)

def deduplicate_dict(data: Dict[str, Any]):
    for key, value in data.items():
        if isinstance(value, list):
            data[key] = remove_duplicates(value)
        elif isinstance(value, dict):
            deduplicate_dict(value)

def aggregate_feedback(feedbacks: List[BaseModel], feedback_class: Type[BaseModel]):
    def to_dict(obj: Any) -> Any:
        if isinstance(obj, BaseModel):
            return {k: to_dict(v) for k, v in obj.model_dump().items()}
        elif isinstance(obj, list):
            return [to_dict(v) for v in obj]
        else:
            return obj

    def from_dict(cls: Type[BaseModel], data: Dict[str, Any]) -> BaseModel:
        field_values = {}
        for name, field in cls.model_fields.items():
            if name in data:
                value = data[name]
                origin = get_origin(field.annotation)
                if origin is not None and issubclass(origin, list):
                    field_type = get_args(field.annotation)[0]
                    if issubclass(field_type, BaseModel):
                        field_values[name] = [from_dict(field_type, v) for v in value]
                    else:
                        field_values[name] = value
                elif isinstance(field.annotation, type) and issubclass(field.annotation, BaseModel):
                    field_values[name] = from_dict(field.annotation, value)
                else:
                    field_values[name] = value
        return cls(**field_values)

    # Initialize the aggregated structure dynamically
    aggregated = defaultdict(lambda: defaultdict(list))

    # Aggregate all feedbacks
    for feedback in feedbacks:
        feedback_dict = to_dict(feedback)
        aggregate_dict(aggregated, feedback_dict)

    # Deduplicate lists
    deduplicate_dict(aggregated)

    # Construct a new feedback object dynamically
    return from_dict(feedback_class, aggregated)

In [38]:

aggregate_feedback(
    feedback_class=Evaluation,
    feedbacks=[
        Evaluation(
            introduction=CriteraFeedback(
                strengths=["s1", "s2"],
                weaknesses=["w1", "w2"],
                suggestions=["ss1, ss2"],
            ),
            structure=CriteraFeedback(
                strengths=["s1", "s2"],
                weaknesses=["w1", "w2"],
                suggestions=["ss1, ss2"],
            ),
            argumentation=CriteraFeedback(
                strengths=["s1", "s2"],
                weaknesses=["w1", "w2"],
                suggestions=["ss1, ss2"],
            ),
            evidence=CriteraFeedback(
                strengths=["s1", "s2"],
                weaknesses=["w1", "w2"],
                suggestions=["ss1, ss2"],
            ),
            conclusion=CriteraFeedback(
                strengths=["s1", "s2"],
                weaknesses=["w1", "w2"],
                suggestions=["ss1, ss2"],
            ),
        ),
        Evaluation(
            introduction=CriteraFeedback(
                strengths=["s11", "s22"],
                weaknesses=["w11", "w22"],
                suggestions=["ss11, ss2"],
            ),
            structure=CriteraFeedback(
                strengths=["s1", "s2"],
                weaknesses=["w1", "w2"],
                suggestions=["ss1, ss2"],
            ),
            argumentation=CriteraFeedback(
                strengths=["s1", "s2"],
                weaknesses=["w1", "w2"],
                suggestions=["ss1, ss2"],
            ),
            evidence=CriteraFeedback(
                strengths=["s1", "s2"],
                weaknesses=["w1", "w2"],
                suggestions=["ss1, ss2"],
            ),
            conclusion=CriteraFeedback(
                strengths=["s1", "s2"],
                weaknesses=["w1", "w2"],
                suggestions=["ss1, ss2"],
            ),
        ),
    ],
)

Evaluation(introduction=CriteraFeedback(strengths=['s1', 's2', 's11', 's22'], weaknesses=['w1', 'w2', 'w11', 'w22'], suggestions=['ss1, ss2', 'ss11, ss2']), structure=CriteraFeedback(strengths=['s1', 's2'], weaknesses=['w1', 'w2'], suggestions=['ss1, ss2']), argumentation=CriteraFeedback(strengths=['s1', 's2'], weaknesses=['w1', 'w2'], suggestions=['ss1, ss2']), evidence=CriteraFeedback(strengths=['s1', 's2'], weaknesses=['w1', 'w2'], suggestions=['ss1, ss2']), conclusion=CriteraFeedback(strengths=['s1', 's2'], weaknesses=['w1', 'w2'], suggestions=['ss1, ss2']))