# Package and Global Variables Initializations 

In [None]:
!pip install datasets pandas tqdm torch transformers python-Levenshtein accelerate

In [None]:
import os 
import re
from datasets import load_dataset
import csv
import pandas as pd
from tqdm.notebook import tqdm
import Levenshtein


project_path = './LLMs-As-Cryptanalysts'
os.makedirs(project_path, exist_ok=True)

print("Project folder created successfully!")

# Downloading Cryptographic Dataset 

In [None]:
crypt_dataset = load_dataset("Sakonii/EncryptionDataset")

In [None]:
print(crypt_dataset)

# Character-Level Tokenization Experiments

In [None]:
def add_character_spacing(dataset):

  return {
      "ciphertext_char_level": " ".join(list(dataset["cipher_text"])),
      "plaintext_char_level": " ".join(list(dataset["plain_text"]))
  }


updated_crypt_dataset = crypt_dataset.map(add_character_spacing)

In [None]:
updated_crypt_dataset['train'][0]

# Setting up Mistral-7B-Instruction-v0.3

In [None]:
# Setting up Mistral using higgingface transfromers library
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline

model_id = "mistralai/Mistral-7B-Instruct-v0.3"
print('Loading Mistral Model...')
model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto")

print('Loading Mistral Tokenizer...')
tokenizer = AutoTokenizer.from_pretrained(model_id, torch_dtype=torch.bfloat16)

print("Model loaded successfully!")

In [None]:
def query_local_mistral(user_prompt, system_prompt=None):

  # Applying mistral's chat template to get the final string
  messages = [
      {"role": "system", "content": f"{'You are an expert in cryptanalysis.' if system_prompt is None else system_prompt}"},
      {"role" : "user", "content": user_prompt}
  ]

  # Get the formatted chat string
  chat_string = tokenizer.apply_chat_template(messages, add_generation_prompt=True, tokenize=False)


  if tokenizer.pad_token_id is None:
      tokenizer.pad_token_id = tokenizer.eos_token_id

  # Now tokenize the string to get input_ids and attention_mask
  inputs = tokenizer(chat_string, return_tensors="pt", padding=True, truncation=True)

  input_ids = inputs["input_ids"].to("cuda")
  attention_mask = inputs["attention_mask"].to("cuda")

  # Generate from model, passing attention_mask and pad_token_id
  generated_response = model.generate(
      input_ids,
      attention_mask=attention_mask, # Pass attention_mask
      pad_token_id=tokenizer.pad_token_id, # Pass pad_token_id
      max_new_tokens=100, # Set to 500 for more complete responses
      do_sample=True,
      temperature=0.1
  )

  # Decode generated results
  decoded_response = tokenizer.decode(generated_response[0])

  # Extract only the model's answer after [/INST]
  start_index = decoded_response.find("[/INST]")
  if start_index != -1:
      return decoded_response[start_index + len("[/INST]"):].strip().strip("</s>")
  else:
      print("[/INST] not found in the generated response.")
  return decoded_response

# RQ1 Main Execution Loop

##### Is LLM with character-level tokenization better at decryption than regular tokenization?

Prompt: Given <insert ciphertext with/without space delimiter> ciphertext and encryption method <insert encryption method>, decrypt the ciphertext and respond with the plaintext.

In [None]:
# Configurations

RESULTS_FILE =  os.path.join(project_path, "rq1_experiments_results_1.csv")


STRICT_SYSTEM_PROMPT = (
  f"You are a specialized decryption tool. "
    f"Your task is to decrypt the provided ciphertext into English plaintext, given the encryption algorithm.\n\n"
    f"STRICT RULES:\n"
    f"1. Output ONLY the plaintext result.\n"
    f"2. Do not explain, do not add headers, do not add notes.\n"
    f"3. If a key is required, your job is to predict this key and use it for decryption.\n"
    f"3. Do not output the key used in the decryption, only the decrypted plaintext\n"
    f"4. NEVER refuse to answer."
)



In [None]:

def grade_result(true_plaintext, prediction):
    """
    Improved grading.
    1. Normalizes text (removes case/spacing).
    2. Checks if the true text is INSIDE the prediction.
    3. OPTIONAL: Checks if they are 'close enough' (fuzzy match).
    """
    # Normalize
    clean_truth = re.sub(r'[^a-zA-Z0-9]', '', true_plaintext).lower()
    clean_pred = re.sub(r'[^a-zA-Z0-9]', '', str(prediction)).lower()

    if not clean_truth: return False # Safety check

    # Check 1: Exact Containment (Fixes the "fox barks" error)
    if clean_truth in clean_pred:
        return True

    # Check 2: Fuzzy Match (Optional, helpful for long text)
    # If the prediction is 90% similar to the truth, count it.
    if len(clean_pred) > 0:
        similarity = Levenshtein.ratio(clean_truth, clean_pred)
        if similarity > 0.85: # 85% match tolerance
            return True

    return False

def calculate_nl(true_text, pred_text):
    """
    Calculate the normalized levenshtein distance between two strings
    Outputs a number between 0 and 1.
    Higher is better.
    """
    dist = Levenshtein.distance(true_text, pred_text)
    
    # 2. Get the max length between the two strings
    max_len = max(len(true_text), len(pred_text))
    
    # Check if both strings are empty
    if max_len == 0:
        return 1.0 if dist == 0 else 0.0
    
    # 3. Calculate Normalized Levenshtein Metric
    nl_score = 1.0 - (dist / max_len)
    
    return nl_score
  

In [None]:
#  Main Eperimental Loop

def run_rq1_experiment(dataset):

  # Checking existing progress
  processed_indices = set()
  file_exists = os.path.isfile(RESULTS_FILE)

  if file_exists:
    # Read the existing csv file to see which rows are already done

    try:
      existing_df = pd.read_csv(RESULTS_FILE)

      processed_indices = set(existing_df["index"].tolist())
      print(f"Found existing progress: {len(processed_indices)} rows already processed.")

    except Exception as e:
      print(f"Warning: Could not read existing file. Starting affresh. Error: {e}")


  with open(RESULTS_FILE,mode="a" if file_exists else "w", newline='', encoding='utf-8') as f:
    # Defining columns for the results file
    fieldnames =[
        'dataset_index',
        'cipher_type',
        'difficulty',
        'true_plaintext',
        'ciphertext_original',
        'prediction_standard',
        'EM_standard',
        'levenstein_standard',
        'ciphertext_spaced',
        'prediction_char_level',
        'EM_char_level',
        'levenstein_char_level'
    ]



    writer = csv.DictWriter(f, fieldnames=fieldnames)
    
    
    # Write header only if the results file doesn't exist
    if not file_exists:
      writer.writeheader()

    # Now, let's iterate through the dataset to run the experiment
    for index, row in tqdm(enumerate(dataset['train']), total=len(dataset['train']), desc= "RQ1 Experiment"):

      # if index is already processed skip it
      if index in processed_indices:
        continue

      # Experiment Logic
      true_pt = row['plain_text']
      cipher_orig = row['cipher_text']
      cipher_type = row['algorithm']


      # A. Standard Tokenization Run

      prompt_std = (
          f"Given the ciphertext: '{cipher_orig}' "
          f"and the encryption method '{cipher_type}', "
          f"decrypt the ciphertext and respond with the plaintext."
      )

      pred_std_raw = query_local_mistral(prompt_std, system_prompt=STRICT_SYSTEM_PROMPT )
      pred_std_clean = pred_std_raw


      # Robust Comparison (removes spaces/case for grading)
      def normalize(s): 
          return s.lower().strip().replace(" ", "")

      # Checking the correctness of the predicted plaintext (Exact Match checking)
      em_correct_std = (normalize(pred_std_clean)) == (normalize(true_pt))
      
      # Calculating the Normalized Levenshtein Metric
      nl_std = calculate_nl(true_pt, pred_std_clean)


      # B. Character-Level Tokenization Run

      cipher_spaced = row['ciphertext_char_level']

      

      prompt_char_level = (
          f"Given the ciphertext: '{cipher_spaced}' "
          f"and the encryption method '{cipher_type}', "
          f"decrypt the ciphertext and respond with the plaintext."
      )

      pred_char_raw = query_local_mistral(prompt_char_level,system_prompt = STRICT_SYSTEM_PROMPT)
      pred_char_clean = pred_char_raw

      # Checking correctness of the predicted plaintext with character-level tokenization
      em_correct_char = (normalize(pred_char_clean)) == (normalize(true_pt))
      nl_char = calculate_nl(true_pt, pred_char_clean)

      # Writing results to the results csv file
      results_row = {
          'dataset_index': index,
          'cipher_type': cipher_type,
          'difficulty': row.get('difficulty', 'Unknown'),
          'true_plaintext': true_pt,
          'ciphertext_original': cipher_orig,
          'prediction_standard': pred_std_clean,
          'EM_standard': em_correct_std,
          'levenstein_standard': nl_std,
          'ciphertext_spaced': cipher_spaced,
          'prediction_char_level': pred_char_clean,
          'EM_char_level': em_correct_char,
          'levenstein_char_level': nl_char
      }

      writer.writerow(results_row)


In [None]:
# Running RQ1
run_rq1_experiment(updated_crypt_dataset)

# RQ2 and RQ3 Main Execution loop

###### RQ2: Can LLMs recognize the type of cipher given the ciphertext and the plaintext?
###### RQ3: Can LLMs recognize the type of cipher given only the ciphertext?


In [None]:

RESULTS_FILE = os.path.join(project_path, "rq2_rq3_results.csv")



# --- RQ2 SYSTEM PROMPT (Cipher + Plaintext) ---
RQ2_SYSTEM_PROMPT = f"""You are an expert assistant that identifies encryption methods given ciphertext and the associated plaintext.
Respond with ONLY one of these labels below and nothing else.
Strict Output Rules:
1. Output ONLY the name of the encryption algorithm (e.g., "Caesar Cipher", "AES", "Morse Code").
2. Do NOT explain your reasoning.
3. Do NOT provide code or math.
4. Do NOT write full sentences like "The answer is...".
5. If the method is unrecognizable, output your best guess based on the transformation pattern.
6. Do not add the key, for example if it is Caesar Cipher, just output Caesar Cipher.
7. Respond only with one of the following labels: "Caesar Cipher", "Atbash Cipher", "Morse Code", "Bacon Cipher", "Rail Fence Cipher", "Vigenère Cipher", "Playfair Cipher", "RSA Cipher", "AES Cipher".

"""

# --- RQ3 SYSTEM PROMPT (Ciphertext Only) ---
RQ3_SYSTEM_PROMPT = f"""You are an expert assistant that identifies encryption methods given ciphertext.
Respond with ONLY one of these labels below and nothing else.


Strict Output Rules:
1. Output ONLY the name of the cipher.
2. Do NOT add conversational filler (e.g., "This looks like...").
3. Do NOT explain the features you observed.
4. Output exactly one name.
5. Do not add the key, for example if it is Caesar Cipher, just output Caesar Cipher.
6. Respond only with one of the following names: "Caesar Cipher", "Atbash Cipher", "Morse Code", "Bacon Cipher", "Rail Fence Cipher", "Vigenère Cipher", "Playfair Cipher", "RSA Cipher", "AES Cipher".
"""

def get_rq2_prompt(ciphertext, plaintext):

  return (
      f"Given {ciphertext} and {plaintext}, "
      f"what type of encryption method converts the plaintext to the ciphertext?"
  )

def get_rq3_prompt(ciphertext):

  return (
      f"Given {ciphertext} what type of encryption method "
      f"was used to encrypt this ciphertext?"
  )


def check_classification(prediction, true_label):
    """
    Checks if the true label (e.g., 'Caesar Cipher') is in the prediction.
    """
    # 1. Normalize both strings (remove spaces, lowercase, punctuation)
    # This ensures "Caesar Cipher" matches "caesar" or "caesar-cipher"
    pred_clean = re.sub(r'[^a-zA-Z0-9]', '', str(prediction)).lower()
    true_clean = re.sub(r'[^a-zA-Z0-9]', '', str(true_label)).lower()

    # 2. Special handling for common overlaps
    # "RSA" matches "RSA Cipher"
    # "Morse" matches "Morse Code"
    if true_clean in pred_clean:
        return True

    return False

In [None]:
# --- MAIN LOOP ---
def run_rq2_rq3_experiment(dataset):

    # 1. Resume Logic
    processed_indices = set()
    file_exists = os.path.isfile(RESULTS_FILE)
    if os.path.isfile(RESULTS_FILE):
        try:
            processed_indices = set(pd.read_csv(RESULTS_FILE)['dataset_index'].tolist())
            print(f"Resuming: {len(processed_indices)} rows already done.")
        except: pass

    field_names = [
        'dataset_index', 'true_cipher_type', 'difficulty','true_plaintext','ciphertext_original',
        # RQ2 Columns
        'rq2_std_pred', 'rq2_std_correct',
        'rq2_char_pred', 'rq2_char_correct',
        # RQ3 Columns
        'rq3_std_pred', 'rq3_std_correct',
        'rq3_char_pred', 'rq3_char_correct'
    ]


    with open(RESULTS_FILE, mode='a' if os.path.isfile(RESULTS_FILE) else 'w', newline='', encoding='utf-8') as f:

      writer = csv.DictWriter(f, fieldnames=field_names)
      if not file_exists:
        writer.writeheader()

      # Iterate
      for index, row in tqdm(enumerate(dataset['train']), total=len(dataset['train']), desc="RQ2 & RQ3 Classification"):
          if index in processed_indices:
            continue

          cipher_orig = row['cipher_text']
          plaintext_orig = row['plain_text']
          true_cipher = row['algorithm']

          
          cipher_spaced = row['ciphertext_char_level']

          # --- RQ2: Given Cipher + Plain (Standard) ---
          prompt_rq2_std = get_rq2_prompt(cipher_orig, plaintext_orig)
          pred_rq2_std = query_local_mistral(prompt_rq2_std, system_prompt=RQ2_SYSTEM_PROMPT)
          corr_rq2_std = check_classification(pred_rq2_std, true_cipher)

          # --- RQ2: Given Cipher + Plain (Char Level) ---
          prompt_rq2_char = get_rq2_prompt(cipher_spaced, plaintext_orig)
          pred_rq2_char = query_local_mistral(prompt_rq2_char, system_prompt=RQ2_SYSTEM_PROMPT)
          corr_rq2_char = check_classification(pred_rq2_char, true_cipher)

          # --- RQ3: Given Cipher Only (Standard) ---
          prompt_rq3_std = get_rq3_prompt(cipher_orig)
          pred_rq3_std = query_local_mistral(prompt_rq3_std, system_prompt=RQ3_SYSTEM_PROMPT)
          corr_rq3_std = check_classification(pred_rq3_std, true_cipher)

          # --- RQ3: Given Cipher Only (Char Level) ---
          prompt_rq3_char = get_rq3_prompt(cipher_spaced)
          pred_rq3_char = query_local_mistral(prompt_rq3_char, system_prompt=RQ3_SYSTEM_PROMPT)
          corr_rq3_char = check_classification(pred_rq3_char, true_cipher)

          # --- Save ---
          writer.writerow({
              'dataset_index': index,
              'true_cipher_type': true_cipher,
              'difficulty': row.get('difficulty', 'Unknown'),
              'true_plaintext': plaintext_orig,
              'ciphertext_original': cipher_orig,

              'rq2_std_pred': pred_rq2_std.strip(),
              'rq2_std_correct': corr_rq2_std,

              'rq2_char_pred': pred_rq2_char.strip(),
              'rq2_char_correct': corr_rq2_char,

              'rq3_std_pred': pred_rq3_std.strip(),
              'rq3_std_correct': corr_rq3_std,

              'rq3_char_pred': pred_rq3_char.strip(),
              'rq3_char_correct': corr_rq3_char
          })

In [None]:
# Run the experiment
run_rq2_rq3_experiment(updated_crypt_dataset)