In [7]:
from __future__ import annotations

import csv
import json
from os import listdir
from os.path import isfile
from os.path import join
from pathlib import Path
from typing import Any

import numpy as np
import pandas as pd
import torch
from transformers import AutoModel
from transformers import AutoTokenizer

# Disable the SettingWithCopyWarning
pd.options.mode.chained_assignment = None

def participant_fixations_to_df(path_to_txt_file: str) -> pd.DataFrame:
    df_participant_fixations = pd.read_csv(path_to_txt_file, sep='\t')

    # convert str to list
    df_participant_fixations['CURRENT_FIX_INTEREST_AREAS'] = df_participant_fixations[
        'CURRENT_FIX_INTEREST_AREAS'
    ].apply(json.loads)

    # add accuracy column, accuracy defines whether the comprehension task was answered correctly
    # 1 == correct, 0 == wrong
    df_participant_fixations['accuracy'] = df_participant_fixations.apply(
        lambda row: 1 if (row.correct_option == row.KEY_STROKE) else 0, axis=1,
    )

    # keep only fixations on code snippet (remove those on task / answer options)
    df_participant_fixations = df_participant_fixations[df_participant_fixations['CURRENT_FIX_X'].between(430, 1400)]  # noqa: E501

    df_participant_fixations = df_participant_fixations[[
        'RECORDING_SESSION_LABEL', 'code_snippet_id', 'CURRENT_FIX_INDEX', 'CURRENT_FIX_DURATION',
        'CURRENT_FIX_INTEREST_AREAS', 'CURRENT_FIX_NEAREST_INTEREST_AREA', 'accuracy',
        'CS_SUBJ_DIFFICULTY',
    ]]

    return df_participant_fixations


def get_code_snippet_df(
    df_participant_fixations: pd.DataFrame,
    code_snippet_id: str,
) -> pd.DataFrame:
    # select code snippet
    participant_snippet_fixations = df_participant_fixations[
        df_participant_fixations['code_snippet_id'] == code_snippet_id
    ].copy()

    # fix spelling error in data for 10-177-V1/V2
    if len(participant_snippet_fixations) == 0 and code_snippet_id[:6] == '10-177':
        code_snippet_id_fixed = code_snippet_id[:4] + '17-V' + code_snippet_id[-1:]  # 10-117-V

        participant_snippet_fixations = df_participant_fixations[
            df_participant_fixations['code_snippet_id'] == code_snippet_id_fixed
        ].copy()

    return participant_snippet_fixations


def get_IA_df(code_snippet_id: str) -> pd.DataFrame:
    df_IA = pd.read_csv(
        filepath_or_buffer=f'code_snippets/IAs/IA-{code_snippet_id}.ias', skiprows=2, sep='\t',
        quoting=csv.QUOTE_NONE, names=['FORM', 'IA_ID', 'C1', 'C2', 'C3', 'C4', 'LABEL'],
    )
    return df_IA


def code_tokenizer(
    code_snippet_id: str,
    tokenizer: AutoTokenizer,
) -> list:
    # tokenize code
    f = open(f'code_snippets/original_naming/{code_snippet_id}.py')
    code = f.read()

    # cut off 366-143-V1 because it is too long for CodeBERT Input
    if code_snippet_id == '366-143-V1':
        code_tokens = tokenizer.tokenize(code[:-155])
    else:
        code_tokens = tokenizer.tokenize(code)

    return code_tokens


def get_code_token_embeddings(
    code_tokens: np.array,
    tokenizer: AutoTokenizer,
    model: AutoModel,
) -> np.array:
    # covert tokens to ids
    tokens_ids = tokenizer.convert_tokens_to_ids(code_tokens)

    # create code token embeddings
    context_embeddings = model(torch.tensor(tokens_ids)[None, :])[0]

    return context_embeddings


def map_embeddings_to_IAs(code_tokens: list, df_IA: pd.DataFrame) -> pd.DataFrame:
    # map token embedding indices to Interest Areas
    strip_chars = 'ĊĠ '

    l_i = 0
    t_i = 0

    df_tmp = df_IA.copy()
    df_IA['embedding_idxs'] = None

    for token in code_tokens:
        token = str(token).strip(strip_chars)
        df_tmp['LABEL'].iloc[l_i] = str(df_tmp['LABEL'].iloc[l_i]).strip(strip_chars)

        if len(token) == 0:
            # skip whitespace tokens
            t_i += 1
            continue

        if token == df_tmp['LABEL'].iloc[l_i]:
            if df_IA['embedding_idxs'].iloc[l_i] is None:
                df_IA['embedding_idxs'].iloc[l_i] = [t_i]
            else:
                df_IA['embedding_idxs'].iloc[l_i].append(t_i)

            l_i += 1
            t_i += 1

        elif token in df_tmp['LABEL'].iloc[l_i]:
            if df_IA['embedding_idxs'].iloc[l_i] is None:
                df_IA['embedding_idxs'].iloc[l_i] = [t_i]
            else:
                df_IA['embedding_idxs'].iloc[l_i].append(t_i)

            # removed processed token from IA label
            df_tmp['LABEL'].iloc[l_i] = df_tmp['LABEL'].iloc[l_i].replace(token, '', 1)

            if len(df_tmp['LABEL'].iloc[l_i]) == 0:
                # no more tokens left in IA label
                l_i += 1
                t_i += 1
            else:
                # IA label still contains tokens
                t_i += 1

        else:
            raise Exception(f"Error at t_i = {t_i}; l_i = {l_i}; for token {token} and label \n {df_tmp['LABEL'].iloc[l_i - 1]} \n >>> {df_tmp['LABEL'].iloc[l_i]} \n {df_tmp['LABEL'].iloc[l_i + 1]}")  # noqa: E501

    return df_IA.copy()


def map_embeddings_to_fixations(
    participant_snippet_fixations: pd.DataFrame,
    df_mapped_IA: pd.DataFrame,
) -> pd.DataFrame:
    # map embedding indicies to fixations

    participant_snippet_fixations['embedding_idxs'] = None
    for i, (j, row) in enumerate(participant_snippet_fixations.iterrows()):
        # get Interest Area ID or nearest Interest Area ID
        IA_ID = row['CURRENT_FIX_INTEREST_AREAS'][0] if len(row['CURRENT_FIX_INTEREST_AREAS']) > 0 else row['CURRENT_FIX_NEAREST_INTEREST_AREA']  # noqa: E501

        # get embedding_idxs for fixation's Interest Area
        participant_snippet_fixations['embedding_idxs'].iloc[i] = df_mapped_IA[df_mapped_IA['IA_ID'] == int(IA_ID)][  # noqa: E501
            'embedding_idxs'
        ].values.tolist()

        # flatten embedding_idxs Array to 1D
        participant_snippet_fixations['embedding_idxs'] = participant_snippet_fixations['embedding_idxs'].apply(  # noqa: E501
            np.ravel,
        )

    return participant_snippet_fixations.copy()


def compute_input_matrix_row(
    participant_snippet_mapped_fixations: pd.DataFrame, context_embeddings: torch.Tensor,
    add_labels_fix_feature: bool = False,
) -> list[str | int | float]:
    # compute input matrix
    EMBED_VECTOR_SIZE = 768
    input_matrix_row = []
    # input_matrix_row_meta = []

    for i, row in participant_snippet_mapped_fixations.iterrows():
        # init Interest Area embedding
        IA_embedding = np.zeros(EMBED_VECTOR_SIZE)

        # Sum up all token embeddings of respective Interest Area
        for embedding_idx in row['embedding_idxs']:
            # skip Interest Area if its tokens are beyond CodeBert's max length input
            if embedding_idx is None:
                continue
            IA_tk_embedding = context_embeddings[0][embedding_idx].detach().numpy()
            # print(IA_tk_embedding.shape)
            IA_embedding += IA_tk_embedding

        # skip Interest Area if its tokens are beyond CodeBert's max length input
        if np.array_equal(IA_embedding, np.zeros(EMBED_VECTOR_SIZE)):
            continue

        # divide by token amount of Interest area to get an Interest Area embedding
        # that is the average of all it's tokens
        IA_embedding /= len(row['embedding_idxs'])

        IA_ID = int(row['CURRENT_FIX_NEAREST_INTEREST_AREA'])
        FIX_DUR = row['CURRENT_FIX_DURATION']

        input_matrix_row += [IA_ID, FIX_DUR] + IA_embedding.tolist()

    accuracy = participant_snippet_mapped_fixations['accuracy'].iloc[0]
    code_snippet_id = participant_snippet_mapped_fixations['code_snippet_id'].iloc[0]
    participant_id = participant_snippet_mapped_fixations['RECORDING_SESSION_LABEL'].iloc[0]
    subjective_difficulty = participant_snippet_mapped_fixations['CS_SUBJ_DIFFICULTY'].iloc[0]

    max_sequence_length = 1126
    max_sequence_with_embed_len = (EMBED_VECTOR_SIZE + 2) * max_sequence_length
    padded_input_matrix_row = np.pad(input_matrix_row, (0, max_sequence_with_embed_len - len(input_matrix_row)), mode='constant').tolist()
    if add_labels_fix_feature:
        return [code_snippet_id, participant_id, accuracy, subjective_difficulty] + padded_input_matrix_row
    else:
        return padded_input_matrix_row

In [8]:
def compute_embedding_lookup_table(
    IA_table: pd.DataFrame, codebert_embedding: torch.Tensor, graphcodebert_embedding: torch.Tensor,
):
    EMBED_VECTOR_SIZE = 768
    output_df = IA_table.copy()
    output_df["codebert_code_embedding"] = [[] for _ in range(len(output_df))]
    output_df["graphcodebert_code_embedding"] = [[] for _ in range(len(output_df))]

    for i, row in output_df.iterrows():
        # init Interest Area embedding
        IA_embedding_codebert = np.zeros(EMBED_VECTOR_SIZE)
        IA_embedding_graphcodebert = np.zeros(EMBED_VECTOR_SIZE)

        # Sum up all token embeddings of respective Interest Area
        #print(row)

        if row["embedding_idxs"] == None:
            print(f"embedding_idxs was None for {row['LABEL']}")
            IA_embedding_codebert = IA_embedding_graphcodebert = np.zeros(EMBED_VECTOR_SIZE)
            output_df.at[i,"codebert_code_embedding"] = IA_embedding_codebert.tolist()
            output_df.at[i,"graphcodebert_code_embedding"] = IA_embedding_graphcodebert.tolist()

            continue
        else:
            for embedding_idx in row['embedding_idxs']:
                # skip Interest Area if its tokens are beyond CodeBert's max length input
                if embedding_idx is None:
                    continue
                IA_tk_embedding_codebert = codebert_embedding[0][embedding_idx].detach().numpy()
                IA_embedding_graphcodebert = graphcodebert_embedding[0][embedding_idx].detach().numpy()

                IA_embedding_codebert += IA_tk_embedding_codebert
                IA_embedding_graphcodebert += IA_embedding_graphcodebert

        # divide by token amount of Interest area to get an Interest Area embedding
        # that is the average of all it's tokens
        IA_embedding_codebert /= len(row['embedding_idxs'])
        IA_embedding_graphcodebert /= len(row['embedding_idxs'])

        output_df.at[i,"codebert_code_embedding"] = IA_embedding_codebert.tolist()
        output_df.at[i,"graphcodebert_code_embedding"] = IA_embedding_graphcodebert.tolist()

    return output_df


In [38]:
tokenizer = AutoTokenizer.from_pretrained('microsoft/codebert-base')
model = AutoModel.from_pretrained('microsoft/codebert-base')
graph_tokenizer = AutoTokenizer.from_pretrained('microsoft/graphcodebert-base')
graph_model = AutoModel.from_pretrained('microsoft/graphcodebert-base')


fix_report_parent_dir = './raw-fixation-reports'
fix_report_parent_dir_additional = './raw-fixation-reports-add' # additional reports from participants present in fix_report_parent_dir

fix_report_files = [
    f for f in listdir(fix_report_parent_dir) if isfile(join(fix_report_parent_dir, f))
]

code_snippet_ids = set()
participant_ids = set()

embeddings: dict[str, dict[str, list[Any]]] = {}

skip_snippets = [
    '10-181-V3',
    '10-928-V3',
    '142-929-V3',
    '189-1871-V3',
    # '369-1404-V1', # regenerate reports using IAs for first round participants
    '369-1404-V2',
    '49-76-V1',  # missing IA file
    '49-76-V2',  # missing IA file
]

for fix_report_file in fix_report_files:
    participant_id = fix_report_file[-8:-4]

    # build embeddings dict
    participant_ids.add(participant_id)
    if participant_id not in embeddings:
        embeddings[participant_id] = {}

    # get processed Participant Fixation DataFrame
    df_participant_fixations = participant_fixations_to_df(
        f'{fix_report_parent_dir}/{fix_report_file}',
    )

    # check if data from 2nd experiment round exists for participant and add it
    if Path(f'./{fix_report_parent_dir_additional}/fix_report_{participant_id}.txt').exists():
        df_participant_fixations_2 = participant_fixations_to_df(
            f'./{fix_report_parent_dir_additional}/fix_report_{participant_id}.txt',
        )
        df_participant_fixations = pd.concat(
            [df_participant_fixations, df_participant_fixations_2], join='inner',
        )

    # get Participant Code Snippet IDs and remove snippets to be skipped
    participant_code_snippet_ids = set(df_participant_fixations['code_snippet_id'])
    participant_code_snippet_ids = participant_code_snippet_ids - set(skip_snippets)

    for code_snippet_id in participant_code_snippet_ids:
        print(f'Saw {code_snippet_id} from {participant_id}')
        code_snippet_ids.add(code_snippet_id)

IA_LOOKUP_TABLE = {}

# [[codebert_embedding_for_1st_cs, graphcodebert_embedding_for_1st_cs], [codebert_embedding_for_2nd_cs, graphcodebert_embedding_for_2nd_cs], ...]
PRE_COMPUTED_EMBEDDINGS = []
CS_ID_to_IDX_MAP = {}


for i,code_snippet_id in enumerate(code_snippet_ids):
    print(f'Looking at {code_snippet_id} from {participant_id}, idx: {i}')
    # fix spelling error in data for 10-177-V1/V2
    if code_snippet_id[:6] == '10-117':
        code_snippet_id = code_snippet_id[:4] + '77-V' + code_snippet_id[-1:]  # 10-177-V
        print(f'Looking at corrected ID {code_snippet_id}')

    # generate mapping between CS_ID and CS_IDX
    CS_ID_to_IDX_MAP[code_snippet_id] = i

    # get processed participant_snippet_fixations DataFrame
    participant_snippet_fixations = get_code_snippet_df(
        df_participant_fixations, code_snippet_id,
    )

    # get Interest Areas as DataFrame
    df_IA = get_IA_df(code_snippet_id)

    # get tokenized code
    bert_code_tokens = code_tokenizer(
        code_snippet_id, tokenizer,
    )
    graph_bert_code_tokens = code_tokenizer(
        code_snippet_id, graph_tokenizer,
    )

    # get code token embeddings
    bert_context_embeddings = get_code_token_embeddings(
        bert_code_tokens, tokenizer, model,
    )

    graph_bert_context_embeddings = get_code_token_embeddings(
        graph_bert_code_tokens, graph_tokenizer, graph_model,
    )

    # map code token embeddings to Interest Area DataFrame
    bert_df_mapped_IA = map_embeddings_to_IAs(bert_code_tokens, df_IA)
    graph_bert_df_mapped_IA = map_embeddings_to_IAs(graph_bert_code_tokens, df_IA)

    embedding_lookup_table = compute_embedding_lookup_table(bert_df_mapped_IA, bert_context_embeddings, graph_bert_context_embeddings)

    # Iterate over the rows of the DataFrame
    tmp_embed_single_snippet = []
    for _, row in embedding_lookup_table.iterrows():

        # Extract the values from the specified columns
        code_embedding = row["codebert_code_embedding"]
        graphcode_embedding = row["graphcodebert_code_embedding"]

        # Append the arrays as a tuple to the result list
        tmp_embed_single_snippet.append((code_embedding, graphcode_embedding))

    PRE_COMPUTED_EMBEDDINGS.append(tmp_embed_single_snippet)

Some weights of the model checkpoint at microsoft/graphcodebert-base were not used when initializing RobertaModel: ['lm_head.layer_norm.bias', 'lm_head.layer_norm.weight', 'lm_head.bias', 'lm_head.decoder.weight', 'lm_head.decoder.bias', 'lm_head.dense.bias', 'lm_head.dense.weight']
- This IS expected if you are initializing RobertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of RobertaModel were not initialized from the model checkpoint at microsoft/graphcodebert-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to

Saw 10-258-V2 from P202
Saw 49-510-V1 from P202
Saw 142-929-V2 from P202
Saw 369-1404-V1 from P202
Saw 10-928-V1 from P202
Saw 366-143-V1 from P202
Saw 10-117-V1 from P202
Saw A34-6 from P001
Saw A49-7086 from P001
Saw A189-895 from P001
Saw A1117-384 from P001
Saw A1117-2696 from P001
Saw A84-600 from P001
Saw 369-1404-V1 from P001
Saw 10-258-V2 from P203
Saw 49-510-V1 from P203
Saw 142-929-V2 from P203
Saw 369-1404-V1 from P203
Saw 10-928-V1 from P203
Saw 366-143-V1 from P203
Saw 10-117-V1 from P203
Saw 10-258-V2 from P201
Saw 49-510-V1 from P201
Saw 142-929-V2 from P201
Saw 369-1404-V1 from P201
Saw 10-928-V1 from P201
Saw 366-143-V1 from P201
Saw 10-117-V1 from P201
Saw 10-258-V2 from P204
Saw 49-510-V1 from P204
Saw 142-929-V2 from P204
Saw 369-1404-V1 from P204
Saw 10-928-V1 from P204
Saw 366-143-V1 from P204
Saw 10-117-V1 from P204
Saw 10-258-V2 from P205
Saw 49-510-V1 from P205
Saw 142-929-V2 from P205
Saw 369-1404-V1 from P205
Saw 10-928-V1 from P205
Saw 366-143-V1 from P205
S

In [55]:
# PAD PRE_COMPUTED_EMBEDDINGS, s.t. all dimensions are of equal length

# Determine the maximum length of the sublists
max_length = max(len(sublist) for sublist in PRE_COMPUTED_EMBEDDINGS)

# Pad each sublist with zeros to make them of consistent length
for sublist in PRE_COMPUTED_EMBEDDINGS:
    zero_embed = [0] * 768
    sublist.extend([[zero_embed, zero_embed]] * (max_length - len(sublist)))

np.array(PRE_COMPUTED_EMBEDDINGS).shape

In [57]:
np.array(PRE_COMPUTED_EMBEDDINGS).shape

(23, 166, 2, 768)

In [58]:
len(CS_ID_to_IDX_MAP) #["10-177-V1"][5]["graphcodebert_code_embedding"]
with open('code_snippet_id_mapping.json', 'w') as f:
    json.dump(CS_ID_to_IDX_MAP, f) #, indent=4)

PRE_COMPUTED_EMBEDDINGS = np.array(PRE_COMPUTED_EMBEDDINGS)
np.save("PRE_COMPUTED_EMBEDDINGS.npy", PRE_COMPUTED_EMBEDDINGS)

In [11]:
CS_ID_to_IDX_MAP

{'10-258-V3': 0,
 'A1117-384': 1,
 '49-510-V1': 2,
 '142-929-V2': 3,
 '10-928-V1': 4,
 '366-143-V1': 5,
 'A49-7086': 6,
 'A1117-2696': 7,
 '49-510-V2': 8,
 '369-1404-V1': 9,
 'A34-6': 10,
 '189-1871-V2': 11,
 '10-181-V2': 12,
 '142-929-V1': 13,
 '49-76-V3': 14,
 '10-177-V1': 15,
 '189-1871-V1': 16,
 '10-258-V2': 17,
 '10-177-V2': 18,
 'A189-895': 19,
 '10-258-V1': 20,
 '10-928-V2': 21,
 'A84-600': 22}

In [59]:
# Load the array from file
loaded_array = np.load("PRE_COMPUTED_EMBEDDINGS_5.npy")
print(loaded_array.shape)

(23, 166, 2, 768)
