In [1]:
import yaml


def load_config(filename: str) -> object:
    file = open(filename).read()
    return yaml.safe_load(file)

In [2]:
def load_template(filename: str) -> str:
    file = open(filename, 'r').read()
    return file

In [3]:
from jinja2 import Template
from openai import OpenAI
from settings import OPENAI_API_KEY, OPENAI_BASE_URL, MAX_TOKENS_OUTPUT, PROMPT_TEMPLATE


class Expert:
    client = None

    def __init__(self, model, base_url=None, api_key=None, prompt_template=None):
        self.model = model
        self.base_url = base_url or OPENAI_BASE_URL
        self.api_key = api_key or OPENAI_API_KEY
        self.prompt_template = prompt_template or PROMPT_TEMPLATE

        if self.client is None:
            self.init_client()

    def init_client(self):
        self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)

    def rate_sample(self, sample: str) -> int | None:
        prompt = Template(self.prompt_template).render(context=sample)
        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt}
        ]
        response = self.client.chat.completions.create(
            model=self.model, messages=messages,
            temperature=0, n=1, max_tokens=MAX_TOKENS_OUTPUT
        )

        # Extract the response text and parse it as an integer rating from 1 to 5
        response_text = response.choices[0].message.content
        try:
            rating = int(''.join(filter(str.isdigit, response_text)))
        except ValueError:
            print("Invalid rating value in response: {}".format(response_text))
            return None

        # Ensure the rating is within the valid range [1, 5]
        if rating < 1: rating = 1
        if rating > 5: rating = 5

        return rating

In [4]:
import json
import concurrent.futures
from datasets import load_dataset
import tiktoken

config = load_config('experts.yml')
template = load_template('prompt_template.txt')
experts = [Expert(**expert, prompt_template=template) for expert in config['experts']]
encoding = tiktoken.get_encoding("gpt2")  # Load the Tiktoken encoding model for counting tokens


# Processing function, due to limitation of concurrent requests in python
def process_model(expert: Expert, data: str) -> int:
    return expert.rate_sample(data)


# Load the dataset
dataset = load_dataset("IlyaGusev/saiga_scored", split="train[:10]")  # Replace with your desired dataset name

# Get the data to rate from the dataset
data_to_rate = [json.dumps(sample["messages"], ensure_ascii=False) for sample in dataset]

In [5]:
import concurrent.futures
from settings import MAX_TOKENS_INPUT


def rate_sample(sample: str, experts: list, max_workers: int = 4):
    # Count the number of tokens in the sample
    num_tokens = len(encoding.encode(sample))
    if num_tokens > MAX_TOKENS_INPUT:
        print(f"Skipping sample: Size exceeds limit ({num_tokens} tokens)")
        return None

    print(f"Processing sample: {sample}")

    # Define a function to be executed by each worker
    def rate_expert(expert):
        return expert.model, process_model(expert, sample)

    # Create a thread pool with the specified number of workers
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        ratings = list(executor.map(rate_expert, experts))

    # Convert the list of tuples into a dictionary
    ratings_by_expert = {model: rating for model, rating in ratings}
    print("Ratings:", ratings_by_expert)

    return ratings_by_expert


# max_workers = 1
max_workers = len(experts)

# Example usage
ratings_by_experts = []
for i, data in enumerate(data_to_rate):
    ratings_by_expert = rate_sample(data, experts, max_workers=max_workers)
    ratings_by_experts.append(ratings_by_expert)

Processing sample: [{"content": "Ideas generales de la Seguridad Social (qué es, para qué sirve, qué has oído\ndecir de ella...)", "role": "user"}, {"content": "La Seguridad Social es un sistema integral diseñado para proteger a los ciudadanos frente a diversas situaciones de vulnerabilidad o necesidad, tales como la vejez, el desempleo, la enfermedad, la discapacidad, la muerte (protección a los dependientes del fallecido) y la maternidad. Este sistema se basa en principios de solidaridad y redistribución de recursos, donde los trabajadores, empleadores y, en algunos casos, el Estado, contribuyen financieramente para sostenerlo.\n\nLa Seguridad Social tiene varios propósitos fundamentales:\n\n1. **Protección contra la pérdida de ingresos**: Asegura un ingreso básico a las personas que no pueden trabajar debido a vejez, enfermedad, discapacidad, maternidad o desempleo.\n\n2. **Acceso a servicios de salud**: Ofrece cobertura de salud para el tratamiento de enfermedades y condiciones méd

In [6]:
import statistics


def classify_rating(ratings_by_experts: list, quorum: int):
    classified_samples = []

    for i, ratings in enumerate(ratings_by_experts):

        # Skip empty ratings
        if ratings is None:
            continue

        # Count the number of valid (non-None) ratings
        valid_ratings = [rating for rating in ratings.values() if rating is not None]

        # Check if the number of valid ratings meets or exceeds the quorum
        majority_vote = True
        if len(valid_ratings) < quorum:
            majority_vote = False
            print(f"Sample {i + 1}: Not enough valid ratings. Skipping classification.")

        # Calculate the average rating
        average_rating = statistics.mean(valid_ratings)
        classification = "Good" if average_rating > 3.5 else "Bad"

        # Check for a majority vote
        print(f"Sample {i + 1}: AVG Rating: {average_rating}, Class: {classification}")
        print(f"Ratings: {json.dumps(valid_ratings)}")
        print(f"Majority Vote Achieved: {majority_vote}")
        print(f"")

        classified_samples.append({
            "sample_index": i + 1,
            "average_rating": average_rating,
            "classification": classification,
            "majority_vote": majority_vote
        })

    return classified_samples if classified_samples else None


# Process each element concurrently using a pool of experts and take a quorum vote
quorum = (len(experts) // 2) + 1  # (50%+1) calculate the minimum number of experts to get a majority vote

# Calculate the average rating and classify it as "bad" or "good" for each sample
results = classify_rating(ratings_by_experts, quorum)
results

Sample 1: AVG Rating: 4.25, Class: Good
Ratings: [4, 5, 4, 4]
Majority Vote Achieved: True

Sample 2: AVG Rating: 4.4, Class: Good
Ratings: [5, 4, 5, 4, 4]
Majority Vote Achieved: True

Sample 3: AVG Rating: 4.25, Class: Good
Ratings: [5, 4, 4, 4]
Majority Vote Achieved: True

Sample 4: AVG Rating: 4.5, Class: Good
Ratings: [5, 5, 4, 4]
Majority Vote Achieved: True

Sample 5: AVG Rating: 4.5, Class: Good
Ratings: [5, 5, 4, 4]
Majority Vote Achieved: True

Sample 7: AVG Rating: 4.5, Class: Good
Ratings: [5, 5, 4, 4]
Majority Vote Achieved: True

Sample 8: AVG Rating: 4.8, Class: Good
Ratings: [5, 5, 4, 5, 5]
Majority Vote Achieved: True

Sample 10: AVG Rating: 4.4, Class: Good
Ratings: [5, 5, 4, 4, 4]
Majority Vote Achieved: True



[{'sample_index': 1,
  'average_rating': 4.25,
  'classification': 'Good',
  'majority_vote': True},
 {'sample_index': 2,
  'average_rating': 4.4,
  'classification': 'Good',
  'majority_vote': True},
 {'sample_index': 3,
  'average_rating': 4.25,
  'classification': 'Good',
  'majority_vote': True},
 {'sample_index': 4,
  'average_rating': 4.5,
  'classification': 'Good',
  'majority_vote': True},
 {'sample_index': 5,
  'average_rating': 4.5,
  'classification': 'Good',
  'majority_vote': True},
 {'sample_index': 7,
  'average_rating': 4.5,
  'classification': 'Good',
  'majority_vote': True},
 {'sample_index': 8,
  'average_rating': 4.8,
  'classification': 'Good',
  'majority_vote': True},
 {'sample_index': 10,
  'average_rating': 4.4,
  'classification': 'Good',
  'majority_vote': True}]