In [1]:
from openai import OpenAI
import dotenv
import os
import json
import pandas as pd

from time import time
from datetime import datetime

dotenv.load_dotenv("../.env")

True

In [2]:
# logging setup
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [3]:
class Question():
    def __init__(self, id, text, answers_domain, correct_answer_index, context):
        self.id = id
        self.text = text
        self.answers_domain = answers_domain
        self.correct_answer_index = correct_answer_index
        self.context = context
        
    def is_correct(self, response):
        """Checks if the response matches the correct answer."""
        return response.strip().lower() == self.answers_domain[self.correct_answer_index].strip().lower()
    
    def get_prompt_text(self):
        return f"{self.id}: {self.text}"
    
    def __str__(self):
        return self.get_prompt_text()

In [4]:
class Dataset():
    FORMAT_DESCRIPTIONS = {
        'csv': (
            "You will receive a raw plaintext dataset in CSV format.\nEach row represents a record, and each column represents an attribute of the data.\nHeader is included in the first row."
        ),

        'horizontal_csv': (
            "You will receive a raw plaintext dataset in Horizontal CSV format.\nHeader is included in the first column\nEach column represents a record, and each row represents an attribute of the data."
        ),

        'json': (
            "You will receive a raw plaintext dataset in JSON format.\nEach record is a JSON object with keys representing attributes."
        ),

        'markdown_kv': (
            "You will receive a raw plaintext dataset in Markdown Key-Value format.\nEvery record is represented as a series of key-value pairs, with each pair on a new line and records separated by a line containing three dashes '---'."
        )
    }
    
    def __init__(self, csv_path):
        try:
            self.df = pd.read_csv(csv_path)
        except Exception as e:
            print(f"Error reading CSV file: {e}")
            
    # Methods to get text data in different formats  
            
    def get_csv_data(self):
        return self.df.to_csv(index=False)

    def get_horizontal_csv_data(self):
        return self.df.transpose().to_csv(header=False, index=True)

    def get_json_data(self):
        return self.df.to_json(orient="records", indent=4)

    def get_md_kv_data(self):
        output_lines = []
        records = self.df.to_dict(orient='records')
        
        for record in records:
            for key, value in record.items():
                output_lines.append(f"{key}: {value}")
            # Separator
            output_lines.append("---")
                
        return "\n".join(output_lines)
    
    def get_formatted_data(self, format_type):
        if format_type == 'csv':
            return self.get_csv_data()
        elif format_type == 'horizontal_csv':
            return self.get_horizontal_csv_data()
        elif format_type == 'json':
            return self.get_json_data()
        elif format_type == 'markdown_kv':
            return self.get_md_kv_data()
        else:
            raise ValueError(f"Unsupported format type: {format_type}")
    
    # Method to get data format description for prompting
    def get_format_description(self, format_type):
        return self.FORMAT_DESCRIPTIONS.get(format_type, "No description available for this format.")
            
    

In [5]:
class QuestionPrompt():
    def __init__(self, question, dataset):
        self.question = question
        self.dataset = dataset

    def build_prompt(self, format_type='csv'):
        base_prompt = "You are an expert in Industrial Control Systems (ICS) and Operational Technology (OT), specialized in identifying system architecture and component relationships by analyzing time-series values of PLC registers."
        data_format = f"# Data Format\n{self.dataset.get_format_description(format_type)}"
        context_section = f"# Context\n{self.question.context}"
        task_section = f"# Task\nAnalyze the provided data to determine the physical architecture and answer precisely to this question:\n{self.question.get_prompt_text()}"

        if len(self.question.context) != 0:    
            prompt = f"{base_prompt}\n\n{data_format}\n\n{task_section}\n\n{context_section}\n\n"
        else:
            prompt = f"{base_prompt}\n\n{data_format}\n\n{task_section}\n\n"
            
        return prompt
            
    def build_data_prompt(self, format_type='csv'):
        data = f"# Data\n{self.dataset.get_formatted_data(format_type)}"

        return data

In [6]:
class LogErrorCode():
    MODEL_ERROR = 101
    JSON_PARSE_ERROR = 201
    MISSING_SHORT_ANSWER = 301
    
    def __init__(self, code):
        self.code = code
        if code == self.MODEL_ERROR:
            self.message = "Model returned an error."
        elif code == self.JSON_PARSE_ERROR:
            self.message = "Error parsing JSON response."
        elif code == self.MISSING_SHORT_ANSWER:
            self.message = "Missing short answer in response."
        else:
            self.message = "Unknown error code."
    
    def __str__(self):
        return f"[Error {self.code}]: {self.message}"

In [7]:
# initialize OpenAI client
CLIENT = OpenAI(
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("DENIS_KEY")
)

In [17]:
ds = Dataset("../datasets/swat/compressed_simplified-swat_plc-data-log_751-lines.csv")

In [18]:
d = [
    'Energy & Power System',
    'Oil, Gas & Chemicals System',
    'Manufacturing System',
    'Water & Wastewater System',
    'Building Automation System',
    'Not Identifiable'
]

q_1_1 = Question(
    id="Q1.1",
    text=f"Classify the physical Industrial Control System (ICS) into one of the following categories: {d}",
    answers_domain=d,
    correct_answer_index=2,
    context=""
)

p_1_1 = QuestionPrompt(q_1_1, ds)
print(p_1_1.build_prompt(format_type='csv'))

You are an expert in Industrial Control Systems (ICS) and Operational Technology (OT), specialized in identifying system architecture and component relationships by analyzing time-series values of PLC registers.

# Data Format
You will receive a raw plaintext dataset in CSV format.
Each row represents a record, and each column represents an attribute of the data.
Header is included in the first row.

# Task
Analyze the provided data to determine the physical architecture and answer precisely to this question:
Q1.1: Classify the physical Industrial Control System (ICS) into one of the following categories: ['Energy & Power System', 'Oil, Gas & Chemicals System', 'Manufacturing System', 'Water & Wastewater System', 'Building Automation System', 'Not Identifiable']




In [20]:
# Q1.1 Possible ICS Type Identification
domain = [
    "Power Generation Plant",
    "Manufacturing Assembly Line",
    "Oil and Gas Refinery",
    "Water Purification Plant",
    "Nuclear Power Plant",
    "Not Identifiable"
]
correct_answer_index = 3 # Water Purification Plant

question = f"Q1.1: Infer, based on data provided, what type of phisical Industrial Control System (ICS) between the following options: {domain}"

prompt = """
You are an expert in Industrial Control Systems (ICS), specialized in identifying system architecture and component relationships by analyzing time-series values of PLC registers.

## Data Format
You will receive a raw plaintext dataset in CSV form:
- The first row contains column names (register labels).
- All fields are comma-separated.
- Each following row represents the register states at a specific timestamp.

## Task
Analyze the dataset and answer the following question:
- {question}

## Data (CSV)""".format(question=question)

In [19]:
d = [str(i) for i in range(11)]

q_2_1 = Question(
    id="Q2.1",
    text=f"Identify how many water tanks are involved during the operations of the ICS under consideration between the following options: {d}.",
    answers_domain=d,
    correct_answer_index=3,
    context="The ICS is catagorized as a Water & Wastewater System."
)

p_2_1 = QuestionPrompt(q_2_1, ds)

In [22]:
# Q2.1 Tanks Number Identification
domain = [str(i+1) for i in range(10)]
correct_answer_index = 3 - 1 # Three Tanks in the ICS

question = f"Q2.1: Only based on data provided, identify how many tanks are involved during the operations of the ICS under consideration between the following options: {domain}"
context = "The ICS is a simplified version of a Water Purification Plant. It produces filtered water through filtration and reverse osmosis processes."

prompt = """
You are an expert in Industrial Control Systems (ICS), specialized in identifying system architecture and component relationships by analyzing time-series values of PLC registers.

## Data Format
You will receive a raw plaintext dataset in CSV form:
- The first row contains column names (register labels).
- All fields are comma-separated.
- Each following row represents the register states at a specific timestamp.

## Context
{context}

## Task
Analyze the dataset and answer the following question:
- {question}

## Data (CSV)""".format(context=context,question=question)

In [23]:
# Q3.1 PLCs Number Identification
domain = [str(i+1) for i in range(10)]
correct_answer_index = 3 - 1 # Three Tanks in the ICS

question = f"Q3.1: Based on provided data and context, identify how many PLCs are involved during the operations of the ICS under consideration between the following options: {domain}"
context = "The ICS is a simplified version of a Water Purification Plant. It produces filtered water through filtration and reverse osmosis processes. The operations involve three water tanks having varying capacities."

prompt = """
You are an expert in Industrial Control Systems (ICS), specialized in identifying system architecture and component relationships by analyzing time-series values of PLC registers.

## Data Format
You will receive a raw plaintext dataset in CSV form:
- The first row contains column names (register labels).
- All fields are comma-separated.
- Each following row represents the register states at a specific timestamp.

## Context
{context}

## Task
Analyze the dataset and answer the following question:
- {question}

## Data (CSV)""".format(context=context,question=question)

In [12]:
# send prompt to model
def send_prompt(model_name, question: Question, prompt: QuestionPrompt, format_type):
    try:
        c = CLIENT.chat.completions.create(
            model=model_name,
            messages=[
                {
                    "role": "system",
                    "content": [{"type": "text", "text": prompt.build_prompt(format_type)}]
                },
                {
                    "role": "user",
                    "content": [{"type": "text", "text": prompt.build_data_prompt(format_type)}]
                },
            ],
            response_format={
                "type": "json_schema",
                "json_schema": {
                    "name": "response",
                    "strict": True,
                    "schema": {
                        "type": "object",
                        "properties": {
                            "response": {
                                "type": "object",
                                "properties": {
                                    "short_answer": {
                                        "enum": question.answers_domain,
                                    },
                                    "confidence": {
                                        "type": "number",
                                        "description": "Confidence level from 0 to 1, where 1 is highest"
                                    },
                                    "reasoning": {
                                        "type": "string",
                                        "description": "Reasoning behind the answer"
                                    }
                                },
                                "required": ["short_answer", "confidence", "reasoning"]
                            },
                            "limitations": {
                                "type": "array",
                                "description": "List of limitations or uncertainties in the analysis"
                            },
                            "internal_checks": {
                                "type": "object",
                                "properties": {
                                    "columns_used": {
                                        "type": "array",
                                        "description": "List of columns from the dataset that were used in the analysis"
                                    },
                                    "assumptions_detected": {
                                        "type": "array",
                                        "description": "List of assumptions made during the analysis"
                                    },
                                    "warnings": {
                                        "type": "array",
                                        "description": "List of warnings or potential issues identified during the analysis"
                                    }
                                },
                                "required": ["columns_used", "assumptions_detected", "warnings"]
                            }
                        },
                        "required": ["response", "limitations", "internal_checks"],
                        "additionalProperties": False
                    }
                }
            }
        )

        raw = c.choices[0].message.content
        if not raw:
            raise Exception("No content in response")
        return raw
    
    except Exception as e:
        return e

In [20]:
r = send_prompt("tngtech/deepseek-r1t2-chimera:free", q_2_1, p_2_1, 'markdown_kv')

print(r)

KeyboardInterrupt: 

In [25]:
def evaluate_models(models, prompt, csv_data, repetitions=10):
    '''
    Evalute accuracy of multiple models on the given prompt and data.
    '''
    models_log = []
    
    for model in models:
        logging.info(f"Evaluating model: {model}")
        
        correct_answers_counter = 0
        errors_counter = 0
        
        tests_log = []
        
        model_log = {
            "model": model,
            "valid_tests_number": 0,
            "correct_answers_counter": 0,
            "wrong_answers_counter": 0,
            "errors_counter": 0,
            "accuracy": 0.0,
            "tests": [],
            "model_evaluation_time": 0.0
        }
        
        model_start_time = time()
        for i in range(repetitions):
            logging.info(f"Test {i+1}/{repetitions} for model {model}")
            test_log = {"run": i, "short_answer": None, "confidence": None, "error_code": None}
                        
            # send prompt to model and get response
            response = send_prompt(model, prompt, csv_data)
            
            # check if there was a model error
            if isinstance(response, Exception):
                errors_counter += 1
                test_log["error_code"] = LogErrorCode.MODEL_ERROR
                logging.error(f"Model error for {model} on test {i+1}: {response}")
                tests_log.append(test_log)
                continue
            
            # parse JSON response
            try:
                json_response = json.loads(response)
                response_data = json_response.get('response', {})
                
                short_answer = response_data.get('short_answer', '').strip()
                confidence = response_data.get('confidence', 0)
                
                # check missing short answer
                if not short_answer:
                    errors_counter += 1
                    test_log["error_code"] = LogErrorCode.MISSING_SHORT_ANSWER
                    logging.error(f"Missing short answer for {model} on test {i+1}")
                    tests_log.append(test_log)
                    continue
                
                # correct answer check
                if short_answer.lower() == domain[correct_answer_index].strip().lower():
                    correct_answers_counter += 1
                
                # update log
                test_log["short_answer"] = short_answer
                test_log["confidence"] = confidence
                    
            except (TypeError, json.JSONDecodeError) as e:
                errors_counter += 1
                test_log["error_code"] = LogErrorCode.JSON_PARSE_ERROR
                logging.error(f"JSON parse error for {model} on test {i+1}: {e}")
                tests_log.append(test_log)
                continue
            
            # in the end of each test, append the test log
            tests_log.append(test_log)
            logging.info(f"Test {i+1} for model {model} ended without errors!")
            
        model_end_time = time()
            
        # update model log
        valid_tests_number = repetitions - errors_counter
        model_log["valid_tests_number"] = valid_tests_number
        model_log["correct_answers_counter"] = correct_answers_counter
        model_log["wrong_answers_counter"] = valid_tests_number - correct_answers_counter
        model_log["errors_counter"] = errors_counter
        model_log["accuracy"] = (correct_answers_counter / valid_tests_number) * 100.0 if valid_tests_number > 0 else 0.0
        model_log["tests"] = tests_log
        model_log["model_evaluation_time"] = round(model_end_time - model_start_time, 1)
        
        # in the end of each model evaluation, append the model log
        models_log.append(model_log)
    
    return models_log
    

In [26]:
# tested models
models = [
    #"x-ai/grok-4.1-fast:free", # now is only for paying users
    #"meta-llama/llama-3.3-70b-instruct:free",
    #"nvidia/nemotron-nano-12b-v2-vl:free",
    "kwaipilot/kat-coder-pro:free",
    "tngtech/deepseek-r1t2-chimera:free",
    "mistralai/devstral-2512:free",
    #"openrouter/bert-nebulon-alpha",
    #"z-ai/glm-4.5-air:free"
    #"amazon/nova-2-lite-v1:free", # No JSON responses
    #"qwen/qwen3-coder:free",
    #"google/gemma-3-27b-it:free",
    #"openai/gpt-oss-20b:free",
    #"meituan/longcat-flash-chat:free",
    #"allenai/olmo-3-32b-think:free",
    #"alibaba/tongyi-deepresearch-30b-a3b:free",
    #"cognitivecomputations/dolphin-mistral-24b-venice-edition:free"
]

Vorrei creare un metodo che mi permetta di valutare COMPLETAMENTE una domanda, quindi restituire i risultati della domanda in questioni per ogni:
- modello
- dataset
- formato del dataset

Sarebbe da creare una funzione `evaluate(question, models, datasets, formats, n, [anon])`, che restituisca i risultati in uno o pi√π file JSON.

In [28]:
start_date = datetime.today().strftime('%Y_%m_%d')
start_hour = datetime.now().strftime('%H_%M_%S')
start = time()

repetitions = 20
# simple switch for anonymized dataset evaluation
anon = True
if anon:
    models_log = evaluate_models(models, prompt, anonymized_text_data, repetitions)
else:
    models_log = evaluate_models(models, prompt, text_data, repetitions)

end = time()

evaluation_time = end - start

evaluation = {
    "question": question,
    "tested_models": models,
    "tested_dataset_name": tested_dataset_name,
    "dataset_header_anonymization": anon,
    "total_evaluation_time": round(evaluation_time, 1),
    "models_log": models_log,
}

dir_responses_path = f"../responses/{start_date}/"
os.makedirs(os.path.dirname(dir_responses_path), exist_ok=True)
with open(f"{dir_responses_path}/{start_date}-{start_hour}_models_evaluation.json", "w") as f:
    json.dump(evaluation, f, indent=4)

INFO:root:Evaluating model: kwaipilot/kat-coder-pro:free
INFO:root:Test 1/20 for model kwaipilot/kat-coder-pro:free
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Test 1 for model kwaipilot/kat-coder-pro:free ended without errors!
INFO:root:Test 2/20 for model kwaipilot/kat-coder-pro:free
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Test 2 for model kwaipilot/kat-coder-pro:free ended without errors!
INFO:root:Test 3/20 for model kwaipilot/kat-coder-pro:free
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Test 3 for model kwaipilot/kat-coder-pro:free ended without errors!
INFO:root:Test 4/20 for model kwaipilot/kat-coder-pro:free
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Test 4 for model kwaipilot/kat-coder-pro:free ended without errors!
INFO:root:Test 5/20