In [None]:
# !pip3 install torch --index-url https://download.pytorch.org/whl/cu124
# !pip3 install bitsandbytes
# !pip3 install --upgrade accelerate
# !pip install transformers[sentencepiece]

# import torch
# print(torch.cuda.is_available())
# print(torch.cuda.device_count())
# print(torch.cuda.get_device_name(0))
# print(torch.cuda.current_device())

In [None]:
# =======================
# CAESAR CIPHER - NATURAL, RANDOM, AND GREEK TEXT INPUTS
# =======================
import os
import re
import json
import random
import torch
import warnings
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from datetime import datetime

# =======================
# Configuration Section
# =======================
CONFIG = {
    # File paths and directories
    'PATHS': {
        # 'INPUT_FILE': './data/original/natural.txt',
        # 'INPUT_FILE': './data/original/random.txt',
        'INPUT_FILE': './data/original/natural-greek.txt',
        # 'OUTPUT_DIR': './data/encoded/caesar-cipher/natural/',
        # 'OUTPUT_DIR': './data/encoded/caesar-cipher/random/',
        'OUTPUT_DIR': './data/encoded/caesar-cipher/greek/',
        # 'CONFIG_FILE': './caesar-cipher-experiments-log-natural-text.txt'
        # 'CONFIG_FILE': './caesar-cipher-experiments-log-random-text.txt'
        'CONFIG_FILE': './caesar-cipher-experiments-log-greek-text.txt'
    },
    
    # Model configurations
    'MODELS': [
        # '/storage/ukp/shared/shared_model_weights/models--llama-3/8B-Instruct/',
        # '/storage/ukp/shared/shared_model_weights/models--llama-3/70B-Instruct/',
        '/storage/ukp/shared/shared_model_weights/models--mistralai--Mistral-7B-Instruct-v0.3/',
        '/storage/ukp/shared/shared_model_weights/models--mistralai--Mixtral-8x7B-Instruct-v0.1/'
    ],
    
    # Experiment settings
    'EXPERIMENT': {
        'TOTAL_LINES': 25,
        'SHIFT_VALUES': [12],
        'TEMPERATURE': 0.01,
        'SEED': 2266
    },
    
    # Model generation settings
    'GENERATION': {
        'MAX_TOKENS': {
            'base': 64,
            'cot-like': 512,
            'default': 2024,
        }
    },
    
    # Prompt templates
    'PROMPTS': {
        'base': "Encode the following text to a Caesar cipher. The shift is $SHIFT$. Output the cipher text only.\n\nplain text: ",
        'cot-like': "Encode the following text to a Caesar cipher. The shift is $SHIFT$. Output a lookup table and the cipher text in a Python dictionary: {\"lookup_table\": {}, \"cipher_text\":...}. Output the dictionary only.\n\nplain text: ",
        'default': "Encode the following text to a Caesar cipher. The shift is $SHIFT$.\n\nplain text: "
    },
    
    # Regex patterns for text extraction
    'PATTERNS': {
        'LOOKUP_TABLE': r"(\"lookup_table\":|\'lookup_table\':)\s*(\{[^\}]*\})",
        'CIPHER_TEXT': r"(\"cipher_text\":|\'cipher_text\':|`cipher_text`:)\s*(\"[^\"]*\"|'[^']*'|`[^`]*`)",
        'GENERAL_CIPHER': r"(Cipher text:|cipher text:|cipher_text:|shifted text:|Caesar cipher:|The encoded text is:|encoded text:|cipher text is:|answer is|Answer:|Answer is:|encoded text is|output|Solution:|The final answer is)(?!\s*\?:?$)\s*((?!plain text|encoded text).+)"
    }
}

# =======================
# Initialization
# =======================
def setup_environment():
    """Initialize environment settings and suppress warnings"""
    random.seed(CONFIG['EXPERIMENT']['SEED'])
    torch.manual_seed(CONFIG['EXPERIMENT']['SEED'])
    torch.cuda.manual_seed_all(CONFIG['EXPERIMENT']['SEED'])
    
    os.environ["TOKENIZERS_PARALLELISM"] = "false"
    warnings.filterwarnings("ignore", message="huggingface/tokenizers: The current process just got forked")
    warnings.filterwarnings("ignore", category=UserWarning)

# =======================
# Encoder Class Definition
# =======================
class TextEncoder:
    def __init__(self, model_path, temperature=0.01):
        quantization_config = BitsAndBytesConfig(
            load_in_8bit=True
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=True)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_path,
            quantization_config=quantization_config,
            device_map="auto"
        )
        self.temperature = temperature
        self.model_name = self._get_model_name(model_path)
    
    def _get_model_name(self, model_path):
        """Extract model name from path"""
        return model_path.split('models--')[-1].replace('/', '_') if "llama" in model_path else model_path.split('/')[-2]

    def encode(self, prompt_template, text, shift, max_tokens, prompt_type):
        prompt = prompt_template.replace("$SHIFT$", str(shift)) + text
        return self.generate_text(prompt, max_tokens, prompt_type)

    def generate_text(self, prompt, max_tokens, prompt_type):
        inputs = self.tokenizer(prompt, return_tensors="pt")
        input_ids = inputs["input_ids"].to(self.model.device)
        attention_mask = inputs["attention_mask"].to(self.model.device)

        with torch.no_grad():
            outputs = self.model.generate(
                input_ids,
                attention_mask=attention_mask,
                max_new_tokens=max_tokens,
                temperature=self.temperature,
                pad_token_id=self.tokenizer.eos_token_id
            )

        generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
        generated_text = "\n".join([line for line in generated_text.splitlines() if line.strip()]).replace("`", "")

        #print(f"GENERATED_TEXT: {generated_text}")
        
        return self._process_generated_text(generated_text, prompt_type)
    
    def _process_generated_text(self, generated_text, prompt_type):
        """Process the generated text based on prompt type"""
        if prompt_type == 'cot-like':
            return self._process_cot_like_text(generated_text)
        return self._process_regular_text(generated_text)
    
    def _process_cot_like_text(self, generated_text):
        """Process text for cot-like prompt type"""
        lookup_table_matches = re.findall(CONFIG['PATTERNS']['LOOKUP_TABLE'], generated_text)
        cipher_text_matches = re.findall(CONFIG['PATTERNS']['CIPHER_TEXT'], generated_text)
        
        final_lookup_table = next((match[1] for match in lookup_table_matches if match[1] and len(match[1]) > 2), None)
        final_cipher_text = next((match[1].strip("\"'`") for match in cipher_text_matches if match[1] and len(match[1].strip()) > 0), "")
        
        try:
            parsed_lookup_table = json.loads(final_lookup_table) if final_lookup_table else {}
        except json.JSONDecodeError:
            parsed_lookup_table = final_lookup_table if final_lookup_table else {}
            
        return parsed_lookup_table, final_cipher_text
    
    def _process_regular_text(self, generated_text):
        """Process text for regular prompt types"""
        matches = re.findall(CONFIG['PATTERNS']['GENERAL_CIPHER'], generated_text)
        # print(f"MATCHES: {matches}")
        
        cipher_texts = [match[-1].strip() for match in matches if match[-1].strip()]
        
        if not cipher_texts:
            fallback_match = re.findall(CONFIG['PATTERNS']['GENERAL_CIPHER'], generated_text)
            # print(f"FALLBACK_MATCH: {fallback_match}")
            if fallback_match:
                cipher_texts.append(fallback_match[0][-1].strip())

        if len(cipher_texts)>3:
            cipher_texts=cipher_texts[:-1]
        
        return {}, cipher_texts[-1] if cipher_texts else generated_text.strip()

def save_results(entry, json_file, is_last):
    """Save results to JSON file"""
    json.dump(entry, json_file, indent=4)
    json_file.write("\n" if is_last else ",\n")

def write_config_line(experiment_id, total_lines, shift, type_of_plain_text, prompt_type, model, temperature, max_tokens):
    """Write configuration to config file"""
    config_line = f"{experiment_id}_{total_lines}_{shift}_{type_of_plain_text}_{prompt_type}_encrypt_0-{model}_{temperature}_{max_tokens}\n"
    with open(CONFIG['PATHS']['CONFIG_FILE'], 'a') as config_file:
        config_file.write(config_line)

def process_model(model_path, lines_to_encode, shift, type_of_plain_text, model_index, total_models):
    """Process a single model"""
    encoder = TextEncoder(model_path, CONFIG['EXPERIMENT']['TEMPERATURE'])
    
    for prompt_type, prompt_template in CONFIG['PROMPTS'].items():
        max_tokens = CONFIG['GENERATION']['MAX_TOKENS'][prompt_type]
        print(f"\nStarting encoding with model {encoder.model_name} using prompt type '{prompt_type}' (Model {model_index + 1} of {total_models}) with shift {shift}")
        
        experiment_id = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file_path = os.path.join(CONFIG['PATHS']['OUTPUT_DIR'], f"{experiment_id}.json")
        
        write_config_line(experiment_id, len(lines_to_encode), shift, type_of_plain_text, 
                         prompt_type, encoder.model_name, encoder.temperature, max_tokens)
        
        with open(output_file_path, 'w') as json_file:
            json_file.write("[\n")
            
            for line_idx, line in enumerate(tqdm(lines_to_encode, desc=f"Encoding Progress with {encoder.model_name} ({prompt_type})", dynamic_ncols=True)):
                plain_text = line.strip()
                if plain_text:
                    lookup_table, cipher_text = encoder.encode(prompt_template, plain_text, shift, max_tokens, prompt_type)
                    # print(f"CIPHER_TEXT: {cipher_text}")
                    
                    entry = {
                        'plain_text': plain_text,
                        'shift': shift,
                        'cipher_text': cipher_text
                    }
                    if prompt_type == 'cot-like':
                        entry['lookup_table'] = lookup_table
                    
                    save_results(entry, json_file, line_idx == len(lines_to_encode) - 1)
            
            json_file.write("]\n")
        
        print(f"Finished encoding with {encoder.model_name} using prompt type '{prompt_type}' and shift {shift}. Results saved to {output_file_path}\n")
    
    # Clean up
    del encoder.model
    del encoder.tokenizer
    torch.cuda.empty_cache()
    print(f"Freed up GPU memory after processing {encoder.model_name}. Moving to the next model.\n")

def main():
    setup_environment()
    
    # Read input file
    with open(CONFIG['PATHS']['INPUT_FILE'], 'r') as file:
        lines = file.readlines()
    
    lines_to_encode = lines[:CONFIG['EXPERIMENT']['TOTAL_LINES']]
    type_of_plain_text = os.path.splitext(os.path.basename(CONFIG['PATHS']['INPUT_FILE']))[0]
    
    # Process each shift value and model
    for shift in CONFIG['EXPERIMENT']['SHIFT_VALUES']:
        for model_index, model_path in enumerate(CONFIG['MODELS']):
            process_model(model_path, lines_to_encode, shift, type_of_plain_text, 
                        model_index, len(CONFIG['MODELS']))
    
    print("All models, prompt types, and shift values have been processed. Exiting.\n")

if __name__ == "__main__":
    main()

In [None]:
# =======================
# Vigenere Cipher on Natural Text
# =======================

import os
import re
import json
import torch
import warnings
import ast
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from datetime import datetime

CONFIG = {
    'PATHS': {
        'INPUT_FILE': './data/original/natural.txt',
        'OUTPUT_DIR': './data/encoded/vigenere-cipher/',
        'CONFIG_FILE': './vigenere-cipher-experiments-log.txt'
    },
    'MODELS': [
        # '/storage/ukp/shared/shared_model_weights/models--llama-3/8B-Instruct/',
        # '/storage/ukp/shared/shared_model_weights/models--llama-3/70B-Instruct/',
        # '/storage/ukp/shared/shared_model_weights/models--mistralai--Mistral-7B-Instruct-v0.3/',
        '/storage/ukp/shared/shared_model_weights/models--mistralai--Mixtral-8x7B-Instruct-v0.1/'
    ],
    'EXPERIMENT': {
        'TOTAL_LINES': 25,
        'KEY': 'Benchmark',
        'TEMPERATURE': 0.01,
        'SEED': 2266
    },
    'GENERATION': {
        'MAX_TOKENS': {
            'base': 64,
            'cot-like': 512,
            'default': 2024,
        }
    },
    'PROMPTS': {
        # 'base': "Encode the following text using a Vigenère cipher with the key '$KEY$'. Output the cipher text only.\n\nplain text: ",
        # 'cot-like': "Encode the following text using a Vigenère cipher with the key '$KEY$'. Output a lookup table and the cipher text in a Python dictionary: {\"lookup_table\": {}, \"cipher_text\":...}. Output the dictionary only.\n\nplain text: ",
        'default': "Encode the following text using a Vigenère cipher with the key '$KEY$'.\n\nplain text: "
    },
    # ------------------------------------------------------------------------
    # Patterns
    # ------------------------------------------------------------------------
    'PATTERNS': {
        # Matches: "lookup_table": {...}
        'LOOKUP_TABLE': r"(?:\"lookup_table\":|\'lookup_table\':)\s*(\{[\s\S]*?\})(?=,\s*(?:\"cipher_text\":|\'cipher_text\':))",
      
        'CIPHER_TEXT_DICT': r"(?:\"cipher_text\":|\'cipher_text\':|`cipher_text`:)\s*(\"[^\"]+\"|'[^']+'|`[^`]+`)",
  
        'CIPHER_TEXT_ANSWER': r"Answer:\s*The cipher text is:\s*([A-Za-z0-9]+)",
        
        'CIPHER_TEXT_DIRECT': r"(?:Cipher text:|Encrypted text:|Vigenère cipher:|Result:|Output:)\s*([A-Za-z0-9]+)",

        'CIPHER_TEXT_QUOTED': r"[\"'`](?!Benchmark)([A-Za-z0-9]+)[\"'`]",
        # Matches if the cipher text is on its own line between newlines:
        'CIPHER_TEXT_NEWLINE': r"\n([A-Za-z0-9]+)(?:\n|$)",

        'GENERAL_OUTPUT': r"(?:The encoded text is:|The encryption result is:|The Vigenère cipher text is:|Here's the encrypted text:|The result is:)\s*([A-Za-z0-9]+)"
    }
}

class TextEncoder:
    def __init__(self, model_path, temperature=0.01):
        quantization_config = BitsAndBytesConfig(load_in_8bit=True)
        self.tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=True)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_path,
            quantization_config=quantization_config,
            device_map="auto"
        )
        self.temperature = temperature
        self.model_name = model_path.split('/')[-2]

    def encode(self, prompt_template, text, key, max_tokens, prompt_type):
        prompt = prompt_template.replace("$KEY$", key) + text
        print(f"\nPrompt:\n{prompt}\n")
        return self.generate_text(prompt, max_tokens, prompt_type)

    def generate_text(self, prompt, max_tokens, prompt_type):
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
        
        with torch.no_grad():
            outputs = self.model.generate(
                inputs["input_ids"],
                attention_mask=inputs["attention_mask"],
                max_new_tokens=max_tokens,
                temperature=self.temperature,
                pad_token_id=self.tokenizer.eos_token_id
            )

        generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
        generated_text = "\n".join(line for line in generated_text.splitlines() if line.strip())
        print("***************************************")
        print(f"Generated text:\n{generated_text}\n")
        print("***************************************")
        return self._process_generated_text(generated_text, prompt_type)
    
    def _process_generated_text(self, generated_text, prompt_type):
        if prompt_type == 'cot-like':
            return self._process_cot_output(generated_text)
        return self._process_regular_output(generated_text)
    
    def _process_cot_output(self, generated_text):
        # Try to extract lookup table
        lookup_match = re.search(CONFIG['PATTERNS']['LOOKUP_TABLE'], generated_text, flags=re.IGNORECASE)
        lookup_table = {}
        if lookup_match:
            lookup_str = lookup_match.group(1)
            try:
                # Attempt to parse the lookup table using ast.literal_eval
                lookup_table = ast.literal_eval(lookup_str)
                # Ensure that the 'lookup_table' key exists
                if 'lookup_table' in lookup_table:
                    lookup_table = lookup_table['lookup_table']
                else:
                    lookup_table = {}
            except (SyntaxError, ValueError, KeyError) as e:
                print(f"Warning: Could not parse lookup table. Error: {e}")
                lookup_table = {}
        else:
            print("Warning: No lookup table pattern matched.")
        
        # Try to extract cipher text from dictionary format
        cipher_text = ""
        dict_match = re.search(CONFIG['PATTERNS']['CIPHER_TEXT_DICT'], generated_text, flags=re.IGNORECASE)
        if dict_match:
            cipher_text = dict_match.group(1).strip('"\'`')
        else:
            # Fallback to the broader pattern matching
            cipher_text = self._extract_cipher_text(generated_text)
        
        print(f"Processed COT output:\nLookup table: {lookup_table}\nCipher text: {cipher_text}\n")
        return lookup_table, cipher_text
    
    def _process_regular_output(self, generated_text):
        cipher_text = self._extract_cipher_text(generated_text)
        print(f"Processed regular output:\nCipher text: {cipher_text}\n")
        return {}, cipher_text
    
    def _extract_cipher_text(self, text):
        """
        Attempt multiple regex patterns in sequence with case-insensitive matching.
        As soon as one matches, return the captured group.
        If none match, return the cleaned version of the entire text as a last resort.
        """
        patterns = [
            CONFIG['PATTERNS']['CIPHER_TEXT_ANSWER'],      # New pattern
            CONFIG['PATTERNS']['CIPHER_TEXT_DIRECT'],
            CONFIG['PATTERNS']['CIPHER_TEXT_QUOTED'],
            CONFIG['PATTERNS']['CIPHER_TEXT_NEWLINE'],
            CONFIG['PATTERNS']['GENERAL_OUTPUT']
        ]
        
        for pattern in patterns:
            match = re.search(pattern, text, flags=re.IGNORECASE)
            if match:
                captured = match.group(1).strip()
                print(f"Matched pattern: {pattern}\nCaptured cipher text: {captured}")
                return captured
        
        # If no pattern matches, return the cleaned text as a last resort
        cleaned_text = ' '.join(text.split()).strip()
        print(f"Warning: No pattern matched. Using cleaned text: {cleaned_text}")
        return cleaned_text

def process_model(model_path, lines_to_encode):
    encoder = TextEncoder(model_path, CONFIG['EXPERIMENT']['TEMPERATURE'])
    
    for prompt_type, prompt_template in CONFIG['PROMPTS'].items():
        experiment_id = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = os.path.join(CONFIG['PATHS']['OUTPUT_DIR'], f"{experiment_id}.json")
        os.makedirs(CONFIG['PATHS']['OUTPUT_DIR'], exist_ok=True)
        
        print(f"\n{'='*50}")
        print(f"Processing {encoder.model_name} with prompt type '{prompt_type}'")
        print(f"{'='*50}\n")
        
        with open(CONFIG['PATHS']['CONFIG_FILE'], 'a') as config_file:
            config_file.write(f"{experiment_id}_{len(lines_to_encode)}_{CONFIG['EXPERIMENT']['KEY']}_{prompt_type}_{encoder.model_name}\n")
        
        with open(output_file, 'w') as json_file:
            json_file.write("[\n")
            
            for i, line in enumerate(tqdm(lines_to_encode)):
                if line.strip():
                    print(f"\nProcessing line {i+1}/{len(lines_to_encode)}:")
                    print(f"Input text: {line.strip()}")
                    
                    lookup_table, cipher_text = encoder.encode(
                        prompt_template, 
                        line.strip(), 
                        CONFIG['EXPERIMENT']['KEY'],
                        CONFIG['GENERATION']['MAX_TOKENS'][prompt_type],
                        prompt_type
                    )
                    
                    result = {
                        'plain_text': line.strip(),
                        'key': CONFIG['EXPERIMENT']['KEY'],
                        'cipher_text': cipher_text,
                    }
                    if prompt_type == 'cot-like':
                        result['lookup_table'] = lookup_table
                    
                    json.dump(result, json_file, indent=4)
                    json_file.write(",\n" if i < len(lines_to_encode) - 1 else "\n")
                    
                    print(f"Result: {json.dumps(result, indent=2)}\n")
            
            json_file.write("]\n")
        
        print(f"\nResults saved to: {output_file}")
    
    del encoder.model, encoder.tokenizer
    torch.cuda.empty_cache()

def main():
    # Setup
    os.environ["TOKENIZERS_PARALLELISM"] = "false"
    warnings.filterwarnings("ignore")
    torch.manual_seed(CONFIG['EXPERIMENT']['SEED'])
    
    # Create output directory if it doesn't exist
    os.makedirs(CONFIG['PATHS']['OUTPUT_DIR'], exist_ok=True)
    
    print("\nStarting Vigenère cipher encoding process...")
    print(f"Using key: {CONFIG['EXPERIMENT']['KEY']}")
    print(f"Input file: {CONFIG['PATHS']['INPUT_FILE']}")
    print(f"Output directory: {CONFIG['PATHS']['OUTPUT_DIR']}\n")
    
    # Read input file
    with open(CONFIG['PATHS']['INPUT_FILE'], 'r') as file:
        lines = file.readlines()[:CONFIG['EXPERIMENT']['TOTAL_LINES']]
    
    print(f"Loaded {len(lines)} lines for processing\n")
    
    # Process each model
    for model_path in CONFIG['MODELS']:
        print(f"\nProcessing model: {model_path.split('/')[-2]}")
        process_model(model_path, lines)
    
    print("\nAll processing completed successfully.")

if __name__ == "__main__":
    main()

