# AI-content Collector 





This notebook was created to automate the process of generating AI responses using a predefined prompt. All of the responses would be compiled to form a part of the unified dataset. The two core models implemented in this program were ChatGPT and Gemini. Other models would be tested and configured in the later phase of the project.

## Import necessary libraries

In [2]:
import requests
import os
import csv
from openai import OpenAI
import config  # We'll keep this temporarily for backward compatibility
from  abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any
import time
import json
from datetime import datetime
import warnings
from google import genai
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

warnings.filterwarnings("ignore")

## Base collector class

This is the base class for all data collectors

In [3]:
class AICollector(ABC):
    def __init__(self, client ,delay: float = 1.0):
        self.client = client
        self.delay = delay


    @abstractmethod
    def api_call(self, prompt) -> Dict[str, Any]:
        pass

    @abstractmethod
    def format_record(self, record: Dict) -> Dict[str, Any]:
        "Trasform the responses into the standard format"
        pass

    def save_to_csv(self, filename ,records:List[Dict[str, Any]], mode: str):
        """
        Save records incrementally to a CSV file.
        Handles nested directory creation.
        """
        fieldnames = ["ID", "Text", "OriginLabel", "SubmissionType", "Domain", "Timestamp", "PromptID", "GeneratedModel" ]

        os.makedirs('data', exist_ok=True)
        filepath = os.path.join('data', filename)

        if mode == 'w' or not os.path.exists(filepath):   
            with open(filepath, 'w', newline='', encoding='utf-8') as file:
                
                writer = csv.DictWriter(file, fieldnames=fieldnames, quoting=csv.QUOTE_ALL)
                if records:
                    writer.writerows(records)

        else:
            with open(filepath, 'a', newline = '', encoding = 'utf-8') as file:
                writer = csv.DictWriter(file, fieldnames=fieldnames, quoting=csv.QUOTE_ALL)
                if records:
                    writer.writerows(records)
            
        # Add this method to your AICollector base class
    
    def clean_json_content(self, content: str) -> str:
        """
        Clean the generated content by removing markdown code block syntax.
        """
        # Remove markdown code block markers
        
        if content.startswith('```json'):
            content = content[7:]  # Remove '```json'
        elif content.startswith('```'):
            content = content[3:]   # Remove '```'
        
        if content.endswith('```'):
            content = content[:-3]  # Remove closing '```'
        
        # Strip any leading/trailing whitespace and newlines
        content = content.strip()
        
        return content
    
    def save_to_json(self, filename, records: Any, mode: str):
        """Save the responses incrementally to a JSON file.""" 
        os.makedirs('newdata', exist_ok=True)
        filepath = os.path.join('newdata', filename)
        
        # Clean the content if it's a string (AI response)
        if isinstance(records, str):
            cleaned_content = self.clean_json_content(records)
            try:
                # Parse the cleaned content to ensure it's valid JSON
                records = json.loads(cleaned_content)
            except json.JSONDecodeError as e:
                print(f"Error parsing JSON for {filename}: {e}")
                # Save as raw text for debugging
                with open(filepath.replace('.json', '_raw.txt'), 'w', encoding='utf-8') as f:
                    f.write(cleaned_content)
                return
    
        if mode == 'w' or not os.path.exists(filepath):
            with open(filepath, 'w', encoding='utf-8') as file:
                json.dump(records, file, indent=4, ensure_ascii=False)
        else:
            with open(filepath, 'a', encoding='utf-8') as file:
                json.dump(records, file, indent=4, ensure_ascii=False)
    
    def batch_generate(self, promptfile ,datapath: str):
        """
        Generate responses in batches to avoid rate limits.
        """
        try:
            with open(promptfile, 'r') as f:
                prompt_template = f.read()  
            
            for filename in os.listdir(datapath):
                filepath = os.path.join(datapath, filename)  
                
                if not filename.endswith('.json'):
                    continue
                    
                with open(filepath, 'r', encoding='utf-8') as f:
                    file_data = json.load(f)
                
                current_prompt = prompt_template.replace('[file_to_write]', json.dumps(file_data))
                
                response = self.api_call(current_prompt)
                time.sleep(self.delay)
                
                if not response:
                    print(f"No response for file: {filename}")
                    continue
                    
                record = self.format_record(response)
                filename = os.path.splitext(filename)[0]
                new_filename = f"{filename}_final.json"
                self.save_to_json(new_filename, record, 'w')
                print(f"Processed file: {filename}, saved as: {new_filename}")

        except Exception as e:
            print(f'Error in batch processing: {e}')


## ChatGPT Collector

In [4]:
from typing import Any, List


class ChatGPTCollector(AICollector):
    def __init__(self, delay: float = 1.0):
        """ Initialize the ChatGPTCollector with a base schema and delay."""
        # Get API key from environment variable with fallback to config
        api_key = os.environ.get("OPENAI_API_KEY") or config.OPENAI_API_KEY
        self.client = OpenAI(api_key=api_key)
        super().__init__(self.client,  delay)
        
    
    def api_call(self, prompt):
        """
        Call the OpenAI API with the provided prompt.
        """
        try:
            response = self.client.chat.completions.create(
                model = "gpt-4o",
                messages = [{"role": "user", "content": prompt}]
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error during API call: {e}")
            return str(e)
        
    def format_record(self, response: str) -> str:
        """
        Transform the response into the standard format.
        Returns the raw content string for further processing.
        """
        return response
    
    

## GeminiCollector

In [5]:
class GeminiCollector(AICollector):
    def __init__(self, delay: float = 1.0):
        """ Initialize the GeminiCollector with a base schema and delay."""
        # Get API key from environment variable with fallback to config
        api_key = os.environ.get("GEMINI_API_KEY") or config.GEMINI_API_KEY
        self.client = genai.Client(api_key=api_key)
        super().__init__(self.client, delay)
    
    def api_call(self, prompt):
        """
        Call the OpenAI API with the provided prompt.
        """
        try:
            response = self.client.models.generate_content(
            model="gemini-2.5-pro", contents=prompt
        )
            return response.text
        except Exception as e:
            print(f"Error during API call: {e}")
            return ''
            
    def format_record(self, response: str) -> str:
        return response

In [6]:
def validate_schema(file_path: str, base_schema_path: str) -> Dict[str, Any]:
    """
    Validates a JSON file against the base schema structure.
    
    Args:
        file_path: Path to the JSON file to validate
        base_schema_path: Path to the base schema JSON file
        
    Returns:
        Dictionary with validation results including:
        - is_valid: Boolean indicating if the schema is valid
        - errors: List of validation error messages
    """
    # Load the files
    try:
        with open(base_schema_path, 'r', encoding='utf-8') as f:
            base_schema = json.load(f)
    except Exception as e:
        return {"is_valid": False, "errors": [f"Failed to load base schema: {str(e)}"]}
    
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            test_schema = json.load(f)
    except Exception as e:
        return {"is_valid": False, "errors": [f"Failed to load test file: {str(e)}"]}
    
    # Initialize results
    results = {"is_valid": True, "errors": []}
    
    # Check required top-level keys
    required_top_keys = {"domain", "prompt", "rubric", "submissions"}
    missing_keys = required_top_keys - set(test_schema.keys())
    if missing_keys:
        results["is_valid"] = False
        results["errors"].append(f"Missing required top-level keys: {', '.join(missing_keys)}")
    
    # Check rubric structure if it exists
    if "rubric" in test_schema:
        if not isinstance(test_schema["rubric"], dict):
            results["is_valid"] = False
            results["errors"].append("'rubric' must be a dictionary")
        else:
            # Check for rubric_id
            if "rubric_id" not in test_schema["rubric"]:
                results["is_valid"] = False
                results["errors"].append("Missing 'rubric_id' in rubric")
                
            # Check criteria
            if "criteria" not in test_schema["rubric"]:
                results["is_valid"] = False
                results["errors"].append("Missing 'criteria' in rubric")
            elif not isinstance(test_schema["rubric"]["criteria"], list):
                results["is_valid"] = False
                results["errors"].append("'criteria' must be a list")
            else:
                # Check each criterion
                for i, criterion in enumerate(test_schema["rubric"]["criteria"]):
                    if not isinstance(criterion, dict):
                        results["is_valid"] = False
                        results["errors"].append(f"Criterion {i} must be a dictionary")
                        continue
                        
                    # Check required criterion keys
                    criterion_keys = {"criterion_id", "name", "description", "performance_descriptors"}
                    missing_criterion_keys = criterion_keys - set(criterion.keys())
                    if missing_criterion_keys:
                        results["is_valid"] = False
                        results["errors"].append(f"Criterion {i} is missing keys: {', '.join(missing_criterion_keys)}")
                    
                    # Check performance_descriptors structure
                    if "performance_descriptors" in criterion:
                        if not isinstance(criterion["performance_descriptors"], dict):
                            results["is_valid"] = False
                            results["errors"].append(f"'performance_descriptors' in criterion {i} must be a dictionary")
                        elif len(criterion["performance_descriptors"]) == 0:
                            results["is_valid"] = False
                            results["errors"].append(f"'performance_descriptors' in criterion {i} cannot be empty")
                        # We don't validate specific performance descriptor keys as they can vary
    
    # Check submissions structure if it exists
    if "submissions" in test_schema:
        if not isinstance(test_schema["submissions"], list):
            results["is_valid"] = False
            results["errors"].append("'submissions' must be a list")
        else:
            # Check each submission
            for i, submission in enumerate(test_schema["submissions"]):
                if not isinstance(submission, dict):
                    results["is_valid"] = False
                    results["errors"].append(f"Submission {i} must be a dictionary")
                    continue
                
                # Check required submission keys
                submission_keys = {"quality", "llm_questions", "llm_answers", "final_submission", "feedback"}
                missing_submission_keys = submission_keys - set(submission.keys())
                if missing_submission_keys:
                    results["is_valid"] = False
                    results["errors"].append(f"Submission {i} is missing keys: {', '.join(missing_submission_keys)}")
                
                # Check llm_questions and llm_answers are lists with matching lengths
                if "llm_questions" in submission and "llm_answers" in submission:
                    if not isinstance(submission["llm_questions"], list):
                        results["is_valid"] = False
                        results["errors"].append(f"'llm_questions' in submission {i} must be a list")
                    if not isinstance(submission["llm_answers"], list):
                        results["is_valid"] = False
                        results["errors"].append(f"'llm_answers' in submission {i} must be a list")
                    if (isinstance(submission["llm_questions"], list) and 
                        isinstance(submission["llm_answers"], list) and 
                        len(submission["llm_questions"]) != len(submission["llm_answers"])):
                        results["is_valid"] = False
                        results["errors"].append(f"'llm_questions' and 'llm_answers' in submission {i} must have the same length")
                
                # Check feedback structure
                if "feedback" in submission:
                    if not isinstance(submission["feedback"], dict):
                        results["is_valid"] = False
                        results["errors"].append(f"'feedback' in submission {i} must be a dictionary")
    
    return results

def validate_directory(directory_path: str, base_schema_path: str) -> Dict[str, Dict[str, Any]]:
    """
    Validates all JSON files in a directory against the base schema.
    
    Args:
        directory_path: Path to directory containing JSON files to validate
        base_schema_path: Path to the base schema JSON file
        
    Returns:
        Dictionary mapping file names to their validation results
    """
    results = {}
    
    # Validate each JSON file in the directory
    for file_name in os.listdir(directory_path):
        if file_name.endswith('.json'):
            file_path = os.path.join(directory_path, file_name)
            results[file_name] = validate_schema(file_path, base_schema_path)
    
    return results
    
   

## Main Program

In [None]:
#Use the ChatGPTCollector to generate responses
#gptcollector = ChatGPTCollector()

#gptcollector.batch_generate('prompt.txt', 'data')


#Use the GeminiCollector to generate responses
gemini_collector = GeminiCollector()

gemini_collector.batch_generate('prompt.txt', 'data')


### Validate the results

In [None]:
# Directory validation
dir_results = validate_directory("newdata", 'sample_dataset.json')
for file_name, result in dir_results.items():
    print(f"{file_name}: {'✓' if result['is_valid'] else '✗'}")
    if not result['is_valid']:
        for error in result['errors']:
            print(f"  - {error}")

engineer_rubric13_final.json: ✓
engineer_rubric16_final.json: ✓
engineer_rubric11_final.json: ✓
engineer_rubric14_final.json: ✓
engineer_rubric19_final.json: ✓
engineer_rubric20_final.json: ✓
engineer_rubric17_final.json: ✓
engineer_rubric12_final.json: ✓
engineer_rubric15_final.json: ✓
engineer_rubric18_final.json: ✓
