# Basic Prompt Word Choice Analysis 

This notebook contains the code to generate (1) tfidf values and visualizations and (2) tokenize prompt text. 


The approach for running `TfidfVectorizer` with a bespoke tokenizer was inspired by: https://www.davidsbatista.net/blog/2018/02/28/TfidfVectorizer/

Code for generating the by-subset tfidf visualizations is taken directly from: https://buhrmann.github.io/tfidf-analysis.html

Import `interactions.csv`.

In [None]:
import pandas as pd 
import re
import string

prompt_data_raw = pd.read_csv("../raw_data/interactions.csv")
prompt_data_raw.head()

Create 4 categories of prompts (`First Success`, `Last Success`, `First Failure`, `Last Failure`) into one column for later looking at `top_feats_by_class`. 

In [None]:
def create_attempt(row):
    if row['is_first_success']:
        row['attempt']= "First Success"
    elif row['is_last_success']:
        row['attempt']= "Last Success"
    elif row['is_first_failure']:
        row['attempt']= "First Failure"
    elif row['is_last_failure']:
        row['attempt']= "Last Failure"
    else: 
        row['attempt']= "Other"
    return row

prompt_data_raw = prompt_data_raw.apply(create_attempt, axis=1)
len(prompt_data_raw)
prompt_data_raw.head()

We also need to exclude the data appropriately, by sorting out the excluded set of prompts. This code facilitates those changes, as per `compute_pass1.ipynb`.

In [None]:
excludes_df = pd.read_csv("../computed_data/exclude.csv")

excludes_by_problem = excludes_df.groupby("problem").agg({"text": list}).reset_index()
excludes_by_problem = {row["problem"]: row["text"] for _, row in excludes_by_problem.iterrows()}

def string_inclusion_ignore_punctuation(s1, s2):
    # Remove punctuation
    s1 = s1.translate(str.maketrans('', '', string.punctuation))
    s2 = s2.translate(str.maketrans('', '', string.punctuation))
    # Check for inclusion and ignore newlines
    return s1.strip() in s2

def do_exclude(item):
    problem = item["problem"]
    text = item["prompt"]
    if problem in excludes_by_problem:
        for exclude in excludes_by_problem[problem]:
            if string_inclusion_ignore_punctuation(exclude, text):
                return True
    return False

Now we apply it to our data frame.

In [None]:
prompt_data_raw["exclude"] = prompt_data_raw.apply(do_exclude, axis=1)
# remove all excluded prompts
without_excluded_prompts = prompt_data_raw[~prompt_data_raw["exclude"]]
prompt_data = without_excluded_prompts.copy()
len(prompt_data)

We do not store the parameters for the functions separately in `interactions.csv`, so we need to identify them based on the function signature and then we store them in a data column.

In [None]:
def get_args(p):
    try: 
        loc1 = p.index('(')
        loc2 = p.index(')')
    except:
        raise IndexError("issue with removing signature from prompt:", p)
    return [x.strip() for x in p[loc1+1:loc2].split(',')]
prompt_data["args"] = prompt_data["prompt"].apply(get_args)

The first preprocessing step is to handle different functions' parameter names and function names. We do this prior to tokenization, as once tokenization occurs, the other content in the dataframe is lost. Here we have two functions that do regex-based substitutions using the `sub` method into the `submitted_text` (i.e. prompt) field. We do that for all prompts.

We put the replaced terms in `å` (a character not present in the dataset) to help identify them easily in later analyses, and replace them with `*` for presentation.

In [None]:
def arg_helper(arg, val):
    current = re.sub(r'([\[\s,\(:=\-\+\"]|^)' + arg + r'([\s,\)\.\[\]\+:=\-\()\"]|$)', r'\1åPARAMå\2',val)
    current = re.sub("'" + arg + "'", "'åPARAMå'", current) #do not replace posessives, only strings
    return current
    
def replace_args(row):
    for arg in row['args']: 
        row['submitted_text'] = arg_helper(arg, row['submitted_text'])
    return row
    
def replace_func(row):
    row['submitted_text'] = re.sub(r'(\s|^)' + row['problem'] + r'([\(\s,]|$)', r'\1åFUNCTIONNAMEå\2',row['submitted_text'])
    return row

def replace_return(row):
    row['submitted_text'] = re.sub(r'[rR]eturns', r'åRETURNå',row['submitted_text'])
    row['submitted_text'] = re.sub(r'[rR]eturn', r'åRETURNå',row['submitted_text'])
    return row

prompt_data = prompt_data.apply(replace_func, axis=1)
prompt_data = prompt_data.apply(replace_args, axis=1)
prompt_data = prompt_data.apply(replace_return, axis=1)

Tokenization for our application (Python terms and English description) was atypical enough and involved enough prioritization that we wrote our own multi-pass regex approach. The `all_together_tokenizer` below takes in a string representing a `submitted text` and outputs a list of tokenized strings of that text. 

Along the way we also do the preprocessing of making all text lowercase and filtering out stopwords that are not meaningful in this context. There may be possessives and/or contractions that are tokenized as strings rather than separate terms in the dataset.

This function was also used for tokenization for the regression analyses.

In [None]:
def all_together_tokenizer(s):
    #the set of regexes to match in priority order with () for separators we want to keep around
    #(?: ...) groups without keeping around
    regexes = [r"(\w+(?:\[[^\]]*\])+)", #regex for list indexing (list[0][1])
               r"(\[[^\]]*\])", #matches lists and lists of lists
               r"(\{[^}]*\})",  #matches dictionaries
               r"(\'[^\']*\')", #single quote strings
               r"(\"[^\"]*\")", #double quote strings
               r"(\d*\.\d+)",   #floats
               r"([!\.?,-/():;]+)", #punctuation that is "English" punctuation
               r"(\d+)",            #numbers
               r"(=|==|>=|<=|\-=|\+=|!=|\\n|\+|\*|/|\^|\<|\>)", #comparison operators, math operators, etc.
               r"\s"]                          #whitespace 
    
    applied = [] #set of applied regexes
    source = [s.lower()] #what we want to return 
    for elt in regexes: #apply regex in priority order
        for i in range(len(source)): #for each of the current strings in the return list
            skip = False #flag which determines if we have already matched the string, skip because already applied higher priority regex
            for reg in applied: 
                if re.search(reg, source[i]): #set skip
                    skip = True
            if not skip:
                #from https://www.nltk.org/api/nltk.tokenize.regexp.html
                source[i] = re.split(elt, source[i])
            else: #if we are keeping it around unchanged, put in a list so flatten works consistently
                source[i] = [source[i]]
        source = [val for t in source for val in t] #split returns a list, we want one list, not nested lists
        applied.append(elt)
    #remove some common stop words that are not useful to us - determined collaboratively. 
    source = filter(lambda x: x not in ["the", "a", "an", "is", "to", ",", ".", "be", "are", "at", "of", "it", "as"], source)
    
    #replace å with * for better printing
    source = map(lambda x: x.replace('å', '*') if "å" in x else x, source)

    #removes empty strings, https://stackoverflow.com/questions/30933216/split-by-regex-without-resulting-empty-strings-in-python
    return list(filter(None, source)) 

Now we go into a `tf-idf` analysis, first fitting and transforming the model, then making a scatterplot. 
Note that we use the tokenizer built above rather than a built in tokenizer.

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

#this approach for dealing with already tokenized terms for TFIDF 
#inspired by: https://www.davidsbatista.net/blog/2018/02/28/TfidfVectorizer/
tfidf = TfidfVectorizer(
    analyzer='word',
    token_pattern=None,
    tokenizer=all_together_tokenizer)

X = tfidf.fit_transform(prompt_data['submitted_text'].to_numpy().tolist()) 
print(tfidf.vocabulary_)
print(tfidf.get_feature_names_out())

Calculate means across word/class.

In [None]:
#These mean calculation code was taken directly from: https://buhrmann.github.io/tfidf-analysis.html
import numpy as np
def top_tfidf_feats(row, features, top_n=25):
    ''' Get top n tfidf values in row and return them with their corresponding feature names.'''
    topn_ids = np.argsort(row)[::-1][:top_n]
    top_feats = [(features[i], row[i]) for i in topn_ids]
    df = pd.DataFrame(top_feats)
    df.columns = ['feature', 'tfidf']
    return df

def top_mean_feats(Xtr, features, grp_ids=None, min_tfidf=0.1, top_n=50):
    ''' Return the top n features that on average are most important amongst documents in rows
        indentified by indices in grp_ids. '''
    if grp_ids:
        D = Xtr[grp_ids].toarray()
    else:
        D = Xtr.toarray()

    D[D < min_tfidf] = 0
    tfidf_means = np.mean(D, axis=0)
    return top_tfidf_feats(tfidf_means, features, top_n)

def top_feats_by_class(Xtr, y, features, min_tfidf=0.1, top_n=25):
    ''' Return a list of dfs, where each df holds top_n features and their mean tfidf value
        calculated across documents with the same class label. '''
    dfs = []
    labels = np.unique(y)
    for label in labels:
        ids = np.where(y==label)
        feats_df = top_mean_feats(Xtr, features, ids, min_tfidf=min_tfidf, top_n=top_n)
        feats_df.label = label
        dfs.append(feats_df)
    return dfs
print(top_mean_feats(X, tfidf.get_feature_names_out()))
success = top_feats_by_class(X, prompt_data["is_success"], tfidf.get_feature_names_out())
attempt = top_feats_by_class(X, prompt_data["attempt"], tfidf.get_feature_names_out())

Generates heatmaps to compare tfidf scores over `First Success`, `First Failure`, `Last Success`, and `Last Failure` subsets. `overall` is the overlapping words and `all` is all words generated in `top_feats_by_class`.  

In [None]:
import seaborn as sns
from matplotlib import pyplot as plt

sns.set_theme(font_scale=1.2)
def plot_heatmap_overlap(dfs):
    for i, df in enumerate(dfs):
        df.rename(columns ={'tfidf': df.label}, inplace = True)
        df.set_index('feature')
    combined = dfs[0].merge(dfs[1], how='inner', on=['feature']).merge(dfs[2], how='inner', on=['feature']).merge(dfs[3], how='inner', on=['feature'])
    combined = combined.set_index('feature')
    combined = combined.sort_index()
    plt.subplots(figsize=(7,6))
    sns.heatmap(combined, annot=True, fmt='.2f', cmap = "YlGnBu")
def plot_heatmap_all(dfs):
    for i, df in enumerate(dfs):
        df.rename(columns ={'tfidf': df.label}, inplace = True)
        df.set_index('feature')
    concat = pd.merge(dfs[0],dfs[1], on='feature', how='outer').merge(dfs[2], on='feature', how='outer').merge(dfs[3], on='feature', how='outer')
    concat = concat.set_index('feature')
    concat = concat.sort_index()
    plt.subplots(figsize=(10,13))
    sns.heatmap(concat, annot=True, fmt='.2f', cmap = "YlGnBu")

    
plot_heatmap_overlap(attempt)
plot_heatmap_all(attempt)
