In [21]:
import torch
import pandas as pd
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModel


# Set pandas to show full column content
pd.set_option('display.max_colwidth', None)
# Read data
df = pd.read_json("../data/codegptsensor/python/train.jsonl", lines=True)

##### Creating a small sample dataset

In [22]:
df_small = df.sample(n=1000, random_state=42)

# orient="records": Each DataFrame row becomes a JSON object. Without lines=True, it outputs a JSON array of objects.
# lines=True: Writes each record on its own line (JSONL), one JSON object per line.
df_small.to_json("../data/codegptsensor/python/train_small.jsonl", orient="records", lines=True)

##### Verifying the small dataset


In [23]:
df_small = pd.read_json("../data/codegptsensor/python/train_small.jsonl", lines=True)
df_small.head(3)


Unnamed: 0,index,code,contrast,label
0,gp130060,"def save_file(filename, data, mk_parents=True):\n """"""Save file to disk.\n Paramaters\n ----------\n filename : pathlib.Path\n Path to the file.\n data : str\n File contents.\n mk_parents : bool, optional\n If to create parent directories.\n """"""\n parent = filename.parent\n if not parent.exists() and mk_parents:\n logger.debug(""Creating directory: %s"", parent.as_posix())\n parent.mkdir(parents=True)\n with open(filename, mode=""w"") as f:\n logger.debug(""Saving file: %s"", filename.as_posix())\n f.write(data)","import pathlib\n\ndef save_to_disk(filename: pathlib.Path, data: str, mk_parents: bool = False) -> None:\n if mk_parents:\n filename.parent.mkdir(parents=True, exist_ok=True)\n with open(filename, 'w') as f:\n f.write(data)\n",0
1,gp191806,"import functools\nimport logging\n\ndef wrap_callbacks(callback_fn):\n @functools.wraps(callback_fn)\n def wrapper(*args, **kwargs):\n try:\n return callback_fn(*args, **kwargs)\n except Exception as e:\n logging.exception(f""Error in callback: {e}"")\n return ""An error occurred in the callback""\n return wrapper\n","def dont_crash(fn):\n """"""\n Wraps callbacks: a simple information is raised in place of a program crash.\n """"""\n def safe_exec(self, *args, **kwargs):\n try:\n return fn(self, *args, **kwargs)\n except Exception as e:\n logging.exception(e)\n QMessageBox.information(\n self, type(e).__name__, "" "".join(str(x) for x in e.args)\n )\n return safe_exec",1
2,gp166948,"def normalizeGlyphUnicodes(value):\n """"""\n Normalizes glyph unicodes.\n * **value** must be a ``list``.\n * **value** items must normalize as glyph unicodes with\n :func:`normalizeGlyphUnicode`.\n * **value** must not repeat unicode values.\n * Returned value will be a ``tuple`` of ints.\n """"""\n if not isinstance(value, (tuple, list)):\n raise TypeError(""Glyph unicodes must be a list, not %s.""\n % type(value).__name__)\n values = [normalizeGlyphUnicode(v) for v in value]\n duplicates = [v for v, count in Counter(value).items() if count > 1]\n if len(duplicates) != 0:\n raise ValueError(""Duplicate unicode values are not allowed."")\n return tuple(values)","def normalize_glyph_unicodes(value):\n """"""\n Normalizes glyph unicodes.\n * **value** must be a ``list``.\n * **value** items must normalize as glyph unicodes with\n :func:`normalizeGlyphUnicode`.\n * **value** must not repeat unicode values.\n * Returned value will be a ``tuple`` of ints.\n """"""\n from fontTools.misc.transform import Transform\n glyphs = []\n for glyph in value:\n glyph_norm = normalizeGlyphUnicode(glyph)\n if glyph_norm not in glyphs:\n glyphs.append(glyph_norm)\n return tuple(map(ord, glyphs))\n",0


##### Load UniXcoder

In [24]:
# A tokenizer converts code (text) into numbers that the model can process.
tokenizer = AutoTokenizer.from_pretrained("microsoft/unixcoder-base")
# Unixcoder is a neural network that takes numbers and outputs predictions.
model = AutoModel.from_pretrained("microsoft/unixcoder-base")


print("✓ Model loaded successfully!")
print(f"Model type: {type(model)}")
print(f"Model size: {sum(p.numel() for p in model.parameters()) / 1e6:.2f}M parameters")

✓ Model loaded successfully!
Model type: <class 'transformers.models.roberta.modeling_roberta.RobertaModel'>
Model size: 125.93M parameters


##### Test with a simple code snippet

In [25]:
test_code = "def hello():\n    print('Hello world')"
inputs = tokenizer(test_code, return_tensors="pt", truncation=True, max_length=512)

print("\n✓ Tokenizer working!")
print(f"Input shape: {inputs['input_ids'].shape}")


✓ Tokenizer working!
Input shape: torch.Size([1, 12])


##### Embeddings

In [26]:
with torch.no_grad():
    outputs = model(**inputs)
    embeddings = outputs.last_hidden_state


print(f"✓ Model forward pass working!")
print(f"Embedding shape: {embeddings.shape}")

✓ Model forward pass working!
Embedding shape: torch.Size([1, 12, 768])


##### Input and output splitting

In [27]:
# Create data for both columns
human_samples = []
ai_samples = []
for _, row in df_small.iterrows():
    if row['label'] == 0:
        human_samples.append(row['code'])
        ai_samples.append(row['contrast'])
    else:
        ai_samples.append(row['code'])
        human_samples.append(row['contrast'])

# Create a label list (same length as the samples)
samples = human_samples + ai_samples
labels = [0]*len(human_samples) + [1]*len(ai_samples)

print(f"Total samples: {len(samples)} (Human: {len(human_samples)}, AI: {len(ai_samples)})")
print(f"Label counts: {pd.Series(labels).value_counts().to_dict()}")

Total samples: 2000 (Human: 1000, AI: 1000)
Label counts: {0: 1000, 1: 1000}


##### Generate embeddings

In [28]:
model.eval()  # Set model to evaluation mode

embeddings = []
batch_size = 32  # Small batch for safety

with torch.no_grad():
    for i in tqdm(range(0, len(samples), batch_size)):
        batch_text = samples[i:i+batch_size]
        # Tokenize and pad
        inputs = tokenizer(batch_text, padding='max_length', truncation=True, max_length=256, return_tensors="pt")
        # Forward pass
        outputs = model(**inputs)
        # Use the [CLS] token (first in the sequence) as embedding
        batch_emb = outputs.last_hidden_state[:, 0, :].cpu()  # shape: [batch, 768]
        embeddings.append(batch_emb)

# Concatenate over all batches
X = torch.cat(embeddings, dim=0).numpy()
y = labels

print(f"X shape: {X.shape}")  # Should be (2000, 768)
print(f"y length: {len(y)}")


100%|██████████| 63/63 [01:47<00:00,  1.71s/it]

X shape: (2000, 768)
y length: 2000





##### Train a Simple Classifier

##### Neural Network Training

In [29]:
import torch.nn as nn

class CodeClassifier(nn.Module):
    def __init__(self, input_dim=768, hidden_dim=256, num_classes=2):
        super(CodeClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)
        self.fc2 = nn.Linear(hidden_dim, num_classes)
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Create the model
classifier = CodeClassifier()
print(classifier)
print(f"Total parameters: {sum(p.numel() for p in classifier.parameters())}")


CodeClassifier(
  (fc1): Linear(in_features=768, out_features=256, bias=True)
  (relu): ReLU()
  (dropout): Dropout(p=0.3, inplace=False)
  (fc2): Linear(in_features=256, out_features=2, bias=True)
)
Total parameters: 197378


## Paper Recreation

In [30]:
import torch
from torch.utils.data import TensorDataset, DataLoader

# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.LongTensor(y_train)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.LongTensor(y_test)

# Create datasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print(f"Train batches: {len(train_loader)}")
print(f"Test batches: {len(test_loader)}")


Train batches: 50
Test batches: 13


In [31]:
import torch.optim as optim
from tqdm import tqdm

# Training setup
criterion = nn.CrossEntropyLoss()  # Standard loss for classification
optimizer = optim.Adam(classifier.parameters(), lr=0.001)
num_epochs = 10

# Training loop
for epoch in range(num_epochs):
    classifier.train()
    train_loss = 0
    correct = 0
    total = 0
    
    for batch_X, batch_y in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        # Forward pass
        outputs = classifier(batch_X)
        loss = criterion(outputs, batch_y)
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Track metrics
        train_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()
    
    train_acc = 100 * correct / total
    print(f"Epoch {epoch+1}: Loss = {train_loss/len(train_loader):.4f}, Accuracy = {train_acc:.2f}%")


Epoch 1/10: 100%|██████████| 50/50 [00:00<00:00, 752.25it/s]


Epoch 1: Loss = 0.4709, Accuracy = 76.31%


Epoch 2/10: 100%|██████████| 50/50 [00:00<00:00, 1336.18it/s]


Epoch 2: Loss = 0.2929, Accuracy = 87.38%


Epoch 3/10: 100%|██████████| 50/50 [00:00<00:00, 1231.29it/s]


Epoch 3: Loss = 0.2221, Accuracy = 91.56%


Epoch 4/10: 100%|██████████| 50/50 [00:00<00:00, 1243.66it/s]


Epoch 4: Loss = 0.1635, Accuracy = 93.44%


Epoch 5/10: 100%|██████████| 50/50 [00:00<00:00, 1311.40it/s]


Epoch 5: Loss = 0.1351, Accuracy = 95.00%


Epoch 6/10: 100%|██████████| 50/50 [00:00<00:00, 1327.56it/s]


Epoch 6: Loss = 0.0902, Accuracy = 97.19%


Epoch 7/10: 100%|██████████| 50/50 [00:00<00:00, 1379.50it/s]


Epoch 7: Loss = 0.0746, Accuracy = 97.06%


Epoch 8/10: 100%|██████████| 50/50 [00:00<00:00, 1362.25it/s]


Epoch 8: Loss = 0.0542, Accuracy = 98.38%


Epoch 9/10: 100%|██████████| 50/50 [00:00<00:00, 1233.68it/s]


Epoch 9: Loss = 0.0408, Accuracy = 99.06%


Epoch 10/10: 100%|██████████| 50/50 [00:00<00:00, 1452.81it/s]

Epoch 10: Loss = 0.0453, Accuracy = 98.75%





In [32]:
classifier.eval()
correct = 0
total = 0
all_preds = []
all_labels = []

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        outputs = classifier(batch_X)
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()
        all_preds.extend(predicted.numpy())
        all_labels.extend(batch_y.numpy())

test_acc = 100 * correct / total
print(f"\nTest Accuracy: {test_acc:.2f}%")

# Detailed metrics
from sklearn.metrics import classification_report
print(classification_report(all_labels, all_preds, target_names=['Human', 'AI']))



Test Accuracy: 85.50%
              precision    recall  f1-score   support

       Human       0.84      0.87      0.86       199
          AI       0.87      0.84      0.85       201

    accuracy                           0.85       400
   macro avg       0.86      0.86      0.85       400
weighted avg       0.86      0.85      0.85       400



In [33]:
# Load the small dataset
df_small = pd.read_json('../data/test_dataset_small.jsonl', lines=True)

# Create triplets
triplets = []

for _, row in df_small.iterrows():
    code = row['code']
    contrast = row['contrast']
    label = row['label']
    
    # Store the pair with labels
    if label == 0:
        # code is human, contrast is AI
        triplets.append({
            'human': code,
            'ai': contrast
        })
    else:
        # code is AI, contrast is human
        triplets.append({
            'human': contrast,
            'ai': code
        })

print(f"✓ Created {len(triplets)} pairs for contrastive learning")
print("\nExample pair:")
print(f"Human code: {triplets[0]['human'][:100]}...")
print(f"AI code: {triplets[0]['ai'][:100]}...")


✓ Created 1000 pairs for contrastive learning

Example pair:
Human code: def save_file(filename, data, mk_parents=True):
    """Save file to disk.
    Paramaters
    -------...
AI code: import pathlib

def save_to_disk(filename: pathlib.Path, data: str, mk_parents: bool = False) -> Non...


In [34]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ContrastiveClassifier(nn.Module):
    def __init__(self, input_dim=768, hidden_dim=256):
        super(ContrastiveClassifier, self).__init__()
        # Projection head for contrastive learning
        self.projection = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, 128)  # Project to 128-dim space
        )
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, 2)
        )
    
    def forward(self, x, return_projection=False):
        if return_projection:
            return self.projection(x)
        return self.classifier(x)

# Create model
contrastive_model = ContrastiveClassifier()
print(contrastive_model)
print(f"Total parameters: {sum(p.numel() for p in contrastive_model.parameters())}")


ContrastiveClassifier(
  (projection): Sequential(
    (0): Linear(in_features=768, out_features=256, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.3, inplace=False)
    (3): Linear(in_features=256, out_features=128, bias=True)
  )
  (classifier): Sequential(
    (0): Linear(in_features=768, out_features=256, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.3, inplace=False)
    (3): Linear(in_features=256, out_features=2, bias=True)
  )
)
Total parameters: 427138


In [35]:
class ContrastiveLoss(nn.Module):
    def __init__(self, temperature=0.5):
        super(ContrastiveLoss, self).__init__()
        self.temperature = temperature
    
    def forward(self, human_proj, ai_proj):
        # Normalize embeddings
        human_proj = F.normalize(human_proj, dim=1)
        ai_proj = F.normalize(ai_proj, dim=1)
        
        # Compute similarity
        similarity = torch.matmul(human_proj, ai_proj.T) / self.temperature
        
        # Labels: diagonal elements are positive pairs
        batch_size = human_proj.shape[0]
        labels = torch.arange(batch_size).to(human_proj.device)
        
        # Cross-entropy loss
        loss = F.cross_entropy(similarity, labels)
        return loss

contrastive_criterion = ContrastiveLoss(temperature=0.5)
print("✓ Contrastive loss function created")


✓ Contrastive loss function created


In [36]:
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

class PairedCodeDataset(Dataset):
    def __init__(self, triplets, tokenizer, unixcoder_model, max_length=256):
        self.triplets = triplets
        self.tokenizer = tokenizer
        self.unixcoder = unixcoder_model  # This should be UniXcoder
        self.max_length = max_length
        
        # Pre-compute embeddings for speed
        print("Pre-computing embeddings...")
        self.human_embeddings = []
        self.ai_embeddings = []
        
        self.unixcoder.eval()
        with torch.no_grad():
            for pair in tqdm(triplets):
                # Get human code embedding
                h_inputs = tokenizer(pair['human'], padding='max_length', 
                                    truncation=True, max_length=max_length, 
                                    return_tensors="pt")
                h_output = self.unixcoder(**h_inputs)
                h_emb = h_output.last_hidden_state[:, 0, :].squeeze()
                
                # Get AI code embedding
                a_inputs = tokenizer(pair['ai'], padding='max_length', 
                                    truncation=True, max_length=max_length, 
                                    return_tensors="pt")
                a_output = self.unixcoder(**a_inputs)
                a_emb = a_output.last_hidden_state[:, 0, :].squeeze()
                
                self.human_embeddings.append(h_emb)
                self.ai_embeddings.append(a_emb)
    
    def __len__(self):
        return len(self.triplets)
    
    def __getitem__(self, idx):
        return self.human_embeddings[idx], self.ai_embeddings[idx]

# Create dataset - PASS THE CORRECT MODEL
# 'model' should be your UniXcoder model (the one you loaded earlier)
paired_dataset = PairedCodeDataset(triplets, tokenizer, model, max_length=256)
paired_loader = DataLoader(paired_dataset, batch_size=32, shuffle=True)

print(f"✓ Dataset ready: {len(paired_dataset)} pairs")


Pre-computing embeddings...


100%|██████████| 1000/1000 [02:31<00:00,  6.61it/s]

✓ Dataset ready: 1000 pairs



