In [1]:
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import KFold
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter
from sklearn.model_selection import train_test_split
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset
from transformers import BertModel, BertTokenizer
from sklearn.linear_model import LinearRegression
from sklearn.feature_extraction.text import CountVectorizer
from torch.utils.data import TensorDataset
import xgboost as xgb
import seaborn as sns
import matplotlib.pyplot as plt

### TTCT dataset

In [None]:
kodetu_creat_test = pd.read_csv('TTCT_AUT_LOG.csv')

In [None]:
kodetu_creat_test.columns

In [None]:
filtered_columns = kodetu_creat_test[['TTCT_PRE_fluency','TTCT_PRE_elaboration','TTCT_PRE_flexibility']]

In [None]:
kodetu_creat_test[(filtered_columns != '.').all(axis=1)]

In [None]:
df = kodetu_creat_test[(filtered_columns != '.').all(axis=1)]

In [None]:
df.head()

### Kodetu interactions dataset

In [None]:
kodetu_data = pd.read_csv('kodetuTau-interactions_pre_post.csv')

In [None]:
# Select all rows where the 'XMLPseudocode' column contains the word "EXECUTE"
mask = kodetu_data["XMLPseudocode()"].str.contains('EXECUTE')
df_filtered = kodetu_data[mask]

In [None]:
df_filtered = df_filtered[~df_filtered["Pseudocode"].isna()]

In [None]:
df_filtered = df_filtered.drop_duplicates() #df_filtered[df_filtered.duplicated(subset=['Pseudocode','Outcome','Level','SessionId'])]

In [None]:
df_filtered.head()

In [None]:
result = df_filtered.copy()

In [None]:
result_level_1 = result #for the basic model

In [None]:
merged_df = pd.merge(result_level_1, df, left_on='SessionId', right_on='User', how='inner')

In [None]:
merged_df.shape

In [None]:
merged_df["Gender"].hist()

In [None]:
#merged_df  = merged_df[merged_df['Gender']=='1'] #for Male
#merged_df  = merged_df[merged_df['Gender']=='2'] #for Female
#merged_df  = merged_df[merged_df['Outcome']==1] #for Success
#merged_df  = merged_df[merged_df['Outcome']==-1] #for Failure
#merged_df  = merged_df[merged_df['Outcome']==-2] #for Error

In [None]:
merged_df[['TTCT_PRE_fluency','TTCT_PRE_elaboration','TTCT_PRE_flexibility','TTCT_PRE_originality']] =  merged_df[['TTCT_PRE_fluency','TTCT_PRE_elaboration','TTCT_PRE_flexibility','TTCT_PRE_originality']].astype('float')

In [None]:
target_values = merged_df[['TTCT_PRE_fluency','TTCT_PRE_elaboration','TTCT_PRE_flexibility','TTCT_PRE_originality']]

target_values_normalized = (target_values - target_values.min()) / (target_values.max() - target_values.min())

merged_df[['TTCT_PRE_fluency_norm','TTCT_PRE_elaboration_norm','TTCT_PRE_flexibility_norm','TTCT_PRE_originality_norm']] = target_values_normalized

In [None]:
merged_df['num_unique_words'] = merged_df['Pseudocode'].apply(lambda x: len(set(x.split())))
merged_df['sequence_length'] = merged_df['Pseudocode'].apply(lambda x: len(x.split()))

In [None]:
merged_df[['num_unique_words', 'sequence_length', 'Outcome','Level','Gender']] = merged_df[['num_unique_words', 'sequence_length', 'Outcome','Level','Gender']].astype(int)

In [None]:
# Split the data into features and target
X = merged_df[['num_unique_words', 'sequence_length', 'Outcome','Level','Gender']]
y = merged_df[['TTCT_PRE_fluency_norm','TTCT_PRE_flexibility_norm','TTCT_PRE_originality_norm','TTCT_PRE_elaboration_norm']] 

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train a regression model
model = LinearRegression()
model.fit(X_train, y_train)

# Predict the 4-dimensional vector
y_pred = model.predict(X_test)

# Calculate RMSE
rmse = np.sqrt(mean_squared_error(y_test, y_pred, multioutput='raw_values'))
print("RMSE:", rmse)

# Calculate MAE
mae = mean_absolute_error(y_test, y_pred, multioutput='raw_values')
print("MAE:", mae)


In [None]:
# Define the XGBoost model
model = xgb.XGBRegressor(objective='reg:squarederror', n_estimators=100, random_state=42)

# Train the model
model.fit(X_train, y_train)

# Predict the 4-dimensional vector
y_pred = model.predict(X_test)

# Calculate RMSE
rmse = np.sqrt(mean_squared_error(y_test, y_pred, multioutput='raw_values'))
print("RMSE:", rmse)

# Calculate MAE
mae = mean_absolute_error(y_test, y_pred, multioutput='raw_values')
print("MAE:", mae)

In [None]:
### Code2Creativ

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
#BERT

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased')

In [None]:
class BertRegressionModel(nn.Module):
    def __init__(self, bert_model, encoding_dim, regression_hidden_dim):
        super(BertRegressionModel, self).__init__()
        self.bert = bert_model
        self.regression = nn.Sequential(
            nn.Linear(bert_model.config.hidden_size, encoding_dim),
            nn.Tanh(),
            nn.Linear(encoding_dim, regression_hidden_dim),
            nn.Tanh(),
            nn.Linear(regression_hidden_dim, 4)  # Output layer for 4-dimensional regression prediction
        )

    def forward(self, input_ids, attention_mask):
        bert_output = self.bert(input_ids, attention_mask=attention_mask)
        pooled_output = bert_output.pooler_output
        regression_output = self.regression(pooled_output)
        return regression_output

In [None]:
class BertSequenceDataset(Dataset):
    def __init__(self, input_ids, attention_masks, labels):
        self.input_ids = input_ids
        self.attention_masks = attention_masks
        self.labels = labels

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return {
            'input_ids': self.input_ids[idx],
            'attention_mask': self.attention_masks[idx],
            'labels': self.labels[idx]
        }

In [None]:
Xtrain_indices = X_train.index

In [None]:
Xtest_indices = X_test.index

In [None]:
sequences = merged_df["Pseudocode"]

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Ensure sequences are a list of strings
if isinstance(sequences, pd.Series):
    sequences = sequences.tolist()
elif isinstance(sequences, np.ndarray):
    sequences = sequences.tolist()
    
# Tokenize sequences
tokenized_inputs = tokenizer(sequences, padding=True, truncation=True, return_tensors='pt')

In [None]:
# Get input_ids and attention_masks
input_ids = tokenized_inputs['input_ids']
attention_masks = tokenized_inputs['attention_mask']

In [None]:
Xtrain_indices_np = np.array(Xtrain_indices)
Xtest_indices_np = np.array(Xtest_indices)

# Use index arrays to get data for training and testing
X_train_ids = torch.tensor(input_ids[Xtrain_indices_np], dtype=torch.long)
X_test_ids = torch.tensor(input_ids[Xtest_indices_np], dtype=torch.long)

X_train_masks = torch.tensor(attention_masks[Xtrain_indices_np], dtype=torch.long)
X_test_masks = torch.tensor(attention_masks[Xtest_indices_np], dtype=torch.long)

# Convert labels to tensor
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.float)

# Create training and test datasets
train_dataset = BertSequenceDataset(X_train_ids, X_train_masks, y_train_tensor)
test_dataset = BertSequenceDataset(X_test_ids, X_test_masks, y_test_tensor)

# Create training and test dataloaders
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Print to confirm everything is set up correctly
print("Train DataLoader size:", len(train_dataloader))
print("Test DataLoader size:", len(test_dataloader))

# Verify the shapes of the tensors
print("X_train_ids shape:", X_train_ids.shape)
print("X_train_masks shape:", X_train_masks.shape)
print("y_train_tensor shape:", y_train_tensor.shape)
print("X_test_ids shape:", X_test_ids.shape)
print("X_test_masks shape:", X_test_masks.shape)
print("y_test_tensor shape:", y_test_tensor.shape)

### Define the model

In [None]:
hidden_size = 32
model = BertRegressionModel(bert_model, encoding_dim=256, regression_hidden_dim=32).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
criterion = nn.MSELoss()

### Train the model

In [None]:
num_epochs = 10
clip_value = 1.0  # Clip gradients at this value

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0
    for batch in train_dataloader:
        inputs = batch['input_ids'].to(device)
        masks = batch['attention_mask'].to(device)
        targets = batch['labels'].float()  # Ensure targets are float for MSELoss
        
        optimizer.zero_grad()
        outputs = model(inputs, masks)
        loss = criterion(outputs.to(device), targets.to(device))
        loss.backward()
        #torch.nn.utils.clip_grad_norm_(model.parameters(), clip_value)  # Gradient clipping
        optimizer.step()
        epoch_loss += loss.item()
    
    epoch_loss /= len(train_dataloader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

In [None]:
# Evaluation function
def evaluate(model, dataloader):
    model.eval()
    actuals = []
    predictions = []
    with torch.no_grad():
        for batch in test_dataloader:
            inputs = batch['input_ids'].to(device)
            masks = batch['attention_mask'].to(device)
            targets = batch['labels'].float() 
            
            outputs = model(inputs, attention_mask=masks)
            actuals.extend(targets.cpu().numpy())
            predictions.extend(outputs.cpu().numpy())
    
    actuals = np.array(actuals)
    predictions = np.array(predictions)
    
    rmse = np.sqrt(np.mean((actuals - predictions) ** 2, axis=0))
    mae = np.mean(np.abs(actuals - predictions), axis=0)
    r2 = 1 - np.sum((actuals - predictions) ** 2, axis=0) / np.sum((actuals - np.mean(actuals, axis=0)) ** 2, axis=0)
 
    return rmse, mae

# Evaluate the model on the test set
rmse, mae = evaluate(model, test_dataloader)
print(f"RMSE: {rmse}")
print(f"MAE: {mae}")

In [None]:
#HuberLoss

In [None]:
model = BertRegressionModel(bert_model, encoding_dim=128, regression_hidden_dim=64).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
criterion = nn.SmoothL1Loss()

num_epochs = 10
# Clip gradients at this value
clip_value = 1.0  

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0
    for inputs, masks, targets in train_dataloader:
        inputs = inputs.to(device)
        masks = masks.to(device)
        targets = targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs, masks).to(device)
        loss = criterion(outputs, targets)
        loss.backward()
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip_value)  
        optimizer.step()
        epoch_loss += loss.item()
    
    epoch_loss /= len(train_dataloader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss}")

In [None]:
rmse, mae = evaluate(model.to(device), test_dataloader)
print(f"RMSE: {rmse}")
print(f"MAE: {mae}")

### Bag of words

In [None]:
class BOWRegressionModel(nn.Module):
    def __init__(self, input_dim, encoding_dim, regression_hidden_dim):
        super(BOWRegressionModel, self).__init__()
        self.regression = nn.Sequential(
            nn.Linear(input_dim, encoding_dim),
            nn.Tanh(),
            nn.Linear(encoding_dim, regression_hidden_dim),
            nn.Tanh(),
            nn.Linear(regression_hidden_dim, 4)  # Output layer for 4-dimensional regression prediction
        )

    def forward(self, input_bow):
        regression_output = self.regression(input_bow)
        return regression_output

input_dim = len(Xtrain_indices) 
encoding_dim = 128  
regression_hidden_dim = 64 
bow_model = BOWRegressionModel(input_dim, encoding_dim, regression_hidden_dim)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(bow_model.parameters(), lr=0.001)

In [None]:
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(sequences)


X_bow = torch.tensor(X.toarray(), dtype=torch.float)

Xtrain_indices_tensor = torch.tensor(Xtrain_indices, dtype=torch.long)
Xtest_indices_tensor = torch.tensor(Xtest_indices, dtype=torch.long)

X_train_bow = torch.tensor(X_bow[Xtrain_indices_tensor], dtype=torch.float)
X_test_bow = torch.tensor(X_bow[Xtest_indices_tensor], dtype=torch.float)

y_train_tensor = torch.tensor(y_train.values, dtype=torch.float)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.float)

train_dataset_bow = TensorDataset(X_train_bow, y_train_tensor)
test_dataset_bow = TensorDataset(X_test_bow, y_test_tensor)

train_dataloader_bow = DataLoader(train_dataset_bow, batch_size=32, shuffle=True)
test_dataloader_bow = DataLoader(test_dataset_bow, batch_size=32, shuffle=False)

In [None]:
num_epochs = 10
clip_value = 1.0  

for epoch in range(num_epochs):
    bow_model.train()
    epoch_loss = 0.0
    for inputs, targets in train_dataloader_bow:
        inputs = inputs
        targets = targets.to(device)
        optimizer.zero_grad()
        outputs = bow_model(inputs)
        loss = criterion(outputs.to(device), targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(bow_model.parameters(), clip_value)
        optimizer.step()
        epoch_loss += loss.item()
    
    epoch_loss /= len(train_dataloader_bow)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss}")

In [None]:
def evaluate(model, dataloader):
    model.eval()
    actuals = []
    predictions = []
    with torch.no_grad():
        for inputs, targets in dataloader:
            #inputs = inputs.to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            actuals.extend(targets.cpu().numpy())  
            predictions.extend(outputs.cpu().numpy()) 
    
    actuals = np.array(actuals)
    predictions = np.array(predictions)
    
    rmse = np.sqrt(np.mean((actuals - predictions) ** 2, axis=0))
    mae = np.mean(np.abs(actuals - predictions), axis=0)
    r2 = 1 - np.sum((actuals - predictions) ** 2, axis=0) / np.sum((actuals - np.mean(actuals, axis=0)) ** 2, axis=0)
 
    return rmse, mae, r2

In [None]:
rmse, mae = evaluate(bow_model, test_dataloader_bow)
print(f"RMSE: {rmse}")
print(f"MAE: {mae}")

In [None]:
### Random Embedding

In [None]:
class SequenceDataset(Dataset):
    def __init__(self, sequences, targets):
        self.sequences = sequences
        self.targets = targets

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return self.sequences[idx], self.targets[idx]

In [None]:
num_epochs = 20
clip_value = 1.0  # Clip gradients at this value


batch_size = 32

actual_targets = merged_df[['TTCT_PRE_fluency_norm','TTCT_PRE_flexibility_norm','TTCT_PRE_originality_norm','TTCT_PRE_elaboration_norm']].values
num_sequences = len(actual_targets)  # Number of sequences
max_seq_length = max(len(seq) for seq in sequences)
seq_length = max_seq_length       # Length of each sequence
embedding_dim = 20    # Dimension of each embedding
output_dim = 4        # Dimension of the target output

# Generate random sequences from a normal distribution
random_sequences = np.random.randn(num_sequences, seq_length, embedding_dim)
#random_input = torch.randint(0, vocab_size, (batch_size, seq_length), dtype=torch.long)

X_train = random_sequences[Xtrain_indices_np]
X_test = random_sequences[Xtest_indices_np]

# Convert the data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)

train_dataset = SequenceDataset(X_train, y_train_tensor)
test_dataset = SequenceDataset(X_test, y_test_tensor)

# Create training and test dataloaders
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

model = GRUAutoencoderWithRegression(vocab_size, embedding_dim, encoding_dim, regression_hidden_dim)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0
    for inputs, targets in train_dataloader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets.float())
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip_value)  # Gradient clipping
        optimizer.step()
        epoch_loss += loss.item()
    
    epoch_loss /= len(dataloader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss}")

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

def evaluate(model, dataloader):
    model.eval()
    actuals = []
    predictions = []
    with torch.no_grad():
        for inputs, targets in dataloader:
            outputs = model(inputs.float())
            actuals.append(targets.numpy())
            predictions.append(outputs.numpy())
    
    actuals = np.vstack(actuals)
    predictions = np.vstack(predictions)
    
    rmse = np.sqrt(mean_squared_error(actuals, predictions, multioutput='raw_values'))
    mae = mean_absolute_error(actuals, predictions, multioutput='raw_values')
    r2 = r2_score(actuals, predictions, multioutput='raw_values')
    
    return rmse, mae, r2

# Evaluate the model
rmse, mae, r2 = evaluate(model, test_dataloader)
print(f"RMSE: {rmse}")
print(f"MAE: {mae}")
print(f"R²: {r2}")

In [None]:
## RNN

In [None]:
class SequenceRegressor(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_sequence_length, units, output_dim=4):
        super(SequenceRegressor, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm1 = nn.LSTM(embedding_dim, units, batch_first=True)
        self.lstm2 = nn.LSTM(units, units, batch_first=True)
        self.fc = nn.Linear(units, output_dim)

    def forward(self, input_sequences):
        embedded = self.embedding(input_sequences)
        output, (hidden, cell) = self.lstm1(embedded)
        output, (hidden, cell) = self.lstm2(output)
        output = hidden[-1, :, :]  # Take the last hidden state
        output = self.fc(output)
        return output

units = 64

model = SequenceRegressor(len(vocab) + 1, embedding_dim, max_seq_length, units)

In [None]:
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
num_epochs = 20
clip_value = 1.0  # Clip gradients at this value

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0
    for inputs, targets in dataloader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets.float())
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip_value)  # Gradient clipping
        optimizer.step()
        epoch_loss += loss.item()
    
    epoch_loss /= len(dataloader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss}")

In [None]:
def evaluate(model, dataloader):
    model.eval()
    actuals = []
    predictions = []
    with torch.no_grad():
        for inputs, targets in dataloader:
            outputs = model(inputs)
            actuals.append(targets.numpy())
            predictions.append(outputs.numpy())
    
    actuals = np.vstack(actuals)
    predictions = np.vstack(predictions)
    
    rmse = np.sqrt(mean_squared_error(actuals, predictions, multioutput='raw_values'))
    mae = mean_absolute_error(actuals, predictions, multioutput='raw_values')
    
    return rmse, mae

In [None]:
# Evaluate the model
rmse, mae  = evaluate(model, dataloader)
print(f"RMSE: {rmse}")
print(f"MAE: {mae}")