In [1]:
import pandas as pd
import json
import numpy as np
import openai
import glob
from tqdm.auto import tqdm, trange
from implementations.misc import utils
import re
from collections import namedtuple

In [2]:
with open("../../keys/openai_api_key.txt") as f:
    openai.api_key = f.read()

# df_trials = pd.read_csv("../../data/processed/trials.tsv", sep='\t')
# df_trials.phase = ['practice' if p == 1 else 'test' for p in df_trials.phase]

# puzzle_ids = {(s, p, t): i for i, (s, p, t) in enumerate(df_trials[['subject_id', 'phase', 'trial']].values)}
# df_trials['puzzle_id'] = [puzzle_ids[(s, p, t)] for (s, p, t) in df_trials[['subject_id', 'phase', 'trial']].values]

df_puzzles = pd.read_csv("../../data/processed/puzzles.tsv", sep='\t')
puzzle_ids = {(s, p, t): i for i, (s, p, t) in enumerate(df_puzzles[['subject_id', 'phase', 'trial']].values)}
df_puzzles['puzzle_id'] = [puzzle_ids[(s, p, t)] for (s, p, t) in df_puzzles[['subject_id', 'phase', 'trial']].values]

In [3]:
df_house_types = pd.read_table('../../data/processed/house_types.tsv')
# df_house_types = df_house_types[df_house_types.phase != 'questionnaire']
df_house_types['puzzle_id'] = [puzzle_ids[(s, p, t)] for (s, p, t) in df_house_types[['subject_id', 'phase', 'trial']].values]

In [18]:
df_puzzles

Unnamed: 0,subject_id,sid_hash,phase,trial,key,type,row,column,number,puzzle_id
0,1,0e8a79,practice,1,goal,target,1,1,9,14
1,1,0e8a79,practice,1,targetSingle,target,4,8,9,14
2,1,0e8a79,practice,1,targetDouble,target,3,5,9,14
3,1,0e8a79,practice,1,targetBox,target,7,3,9,14
4,1,0e8a79,practice,1,distractorSingle,distractor,5,5,5,14
...,...,...,...,...,...,...,...,...,...,...
365845,271,6ffc51,test,64,emptySingle,empty,2,8,0,365849
365846,271,6ffc51,test,64,emptyDouble,empty,9,8,0,365849
365847,271,6ffc51,test,64,emptyBox0,empty,4,8,0,365849
365848,271,6ffc51,test,64,emptyBox1,empty,5,8,0,365849


In [28]:
class ddict(dict):
    """dot.notation access to dictionary attributes"""
    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__

In [29]:
def df_to_array(df):
    df = df[df.type != 'empty']
    array = np.zeros((9, 9), int).astype(str)
    for r, c, coord, v in df[['row', 'column', 'key', 'number']].values:
        d = 'X' if coord == 'goal' else v
        array[r-1, c-1] = d
    return array

def draw_grid(array, use_numbers=False):
    """
    If goal = (row, column) is provided, places an X at the goal location
    """
    if use_numbers:
        return str(array)[1:-1].replace('\n ', '\n').replace("'", '')
    
    rows = ["+-------+-------+-------+"]
    for r in range(9):
        row = ['|']
        for c in range(9):
            d = array[r, c]
            row.append('.' if d == '0' else d)
            if c in (2, 5, 8):
                row.append('|')
        row = ' '.join(row)
        rows.append(row)

        if r in (2, 5, 8):
            rows.append("+-------+-------+-------+")
    s = '\n'.join(rows)
    return s

In [30]:
def get_list_format(df_puzzle):
    coords = []
    df = df_puzzle[df_puzzle.type != 'empty'].sample(frac=1)
    df = df[df.key != 'goal']
    for r, c, v in df[['row', 'column', 'number']].values:
        if v > 0:
            coords.append(f"(row {r}, column {c}) contains {v}")
    return coords

In [35]:
def create_hs_list_prompt(df_puzzle, df_house_types):
    df_puzzle = df_puzzle[df_puzzle.type != 'empty']
    target = df_puzzle[df_puzzle.type == 'target'].number.values[0]
    goal = df_puzzle[df_puzzle.key == 'goal'][['row', 'column']].values[0]
    df_puzzle = df_puzzle[df_puzzle.key != 'goal'].sort_values(['row', 'column'])
    house_type = df_puzzle[['subject_id', 'phase', 'trial']].head(1).merge(df_house_types).house_type[0]
    house_index = goal[0] if house_type == 'row' else goal[1]
    # question = "I am a highly intelligent puzzle solving bot.\n"
    array = df_to_array(df_puzzle)
    question = ["Q: Consider a Sudoku grid with the following numbers:"]
    question += get_list_format(df_puzzle)
    question.append(f'What number must (row {goal[0]}, column {goal[1]}) contain?')
    question.append(f'Hint: use the hidden single technique along {house_type} {house_index}.')
    question.append(f"A: (row {goal[0]}, column {goal[1]}) must contain")
    question = '\n'.join(question)
    answer = f'{target}'
    return question, answer

In [62]:
def create_hs_array_prompt(df_puzzle, df_house_types):
    target = df_puzzle[df_puzzle.type == 'target'].number.values[0]
    goal = df_puzzle[df_puzzle.key == 'goal'][['row', 'column']].values[0]
    house_type = df_puzzle[['subject_id', 'phase', 'trial']].head(1).merge(df_house_types).house_type[0]

    array = df_to_array(df_puzzle)
    question = ["Q: Consider a Sudoku grid with the following numbers:"]
    question.append(draw_grid(array, use_numbers=True))
    question.append(f'What number must X be?')
    question.append(f'Hint: use the hidden single technique along the {house_type} that contains X.')
    question.append(f"A: X must be")
    question = '\n'.join(question)
    answer = f'{target}'
    return question, answer

In [63]:
def find_last_number(s):
    return int(re.findall("[0-9]+", s)[-1])

# Hidden Single

In [69]:
mode = 'list'
# mode = 'array'
n = 50
hs_pids = df_house_types.sample(n, random_state=0).puzzle_id.values
df = df_puzzles[df_puzzles.puzzle_id.isin(hs_pids)]

rows = []
for puzzle_id, df_puzzle in df.groupby('puzzle_id'):
    array = df_to_array(df_puzzle)
    grid = draw_grid(array)
    if mode == 'list':
        question, answer = create_hs_list_prompt(df_puzzle, df_house_types)
    else:
        question, answer = create_hs_array_prompt(df_puzzle, df_house_types)
    rows.append({'puzzle_id': puzzle_id, 'array': array, 'grid': grid, 'question': question, 'answer': answer})
df_prompts = pd.DataFrame(rows)
print(df_prompts.question.values[0])

Q: Consider a Sudoku grid with the following numbers:
(row 6, column 7) contains 9
(row 7, column 4) contains 6
(row 2, column 1) contains 7
(row 8, column 2) contains 6
(row 9, column 2) contains 9
(row 3, column 4) contains 9
(row 4, column 1) contains 5
(row 3, column 9) contains 6
(row 5, column 1) contains 8
What number must (row 1, column 1) contain?
Hint: use the hidden single technique along column 1.
A: (row 1, column 1) must contain


In [70]:
model = "text-davinci-003"
responses = []
for prompt in tqdm(df_prompts.question.values):
    prompt = "I am a highly intelligent puzzle solving bot.\n" + prompt
    response = openai.Completion.create(
      model=model,
      prompt=prompt,
      temperature=0,
      max_tokens=100,
      top_p=1,
      frequency_penalty=0.0,
      presence_penalty=0.0,
      stop=["\n"]
    )
    responses.append(response)

  0%|          | 0/50 [00:00<?, ?it/s]

In [71]:
entries = []
total_correct = 0
for i, (g, q, a, r) in enumerate(zip(df_prompts.grid, df_prompts.question, df_prompts.answer, responses)):
    r = r['choices'][0]['text']
    correct = int(a) == find_last_number(r)
    total_correct += correct
    correct = 'Correct' if correct else 'Incorrect'
    entry = [f"Problem {i+1}: {correct}", 
             g, 
             "Prompt", q, 
             f"Expected: {a}", 
             f"Response: {r}"]
    entry = '\n'.join(entry)
    entries.append(entry)
entries.insert(0, f"Accuracy: {total_correct} of {len(entries)}")
entries = '\n\n'.join(entries)

dirname = f"../../data/llm/{model}/"
utils.mkdir(dirname)
with open(dirname + f"hidden_single_{mode}.txt", 'w') as f:
    f.write(entries)

In [72]:
df_problems = df_prompts[['puzzle_id', 'question', 'answer']].copy()
df_problems.answer = [find_last_number(r) for r in df_problems.answer]
df_problems['response'] = [r['choices'][0]['text'] for r in responses]
df_problems['r_parsed'] = [find_last_number(r) for r in df_problems.response]
df_problems['correct'] = df_problems.answer == df_problems.r_parsed
df_problems.to_csv(dirname + f"hidden_single_{mode}.tsv", sep='\t', index=False)

In [172]:
# # If the models were guessing arbitrarily, then we'd expect an agreement rate of 25%.
# # Using binomial distribution, P(X >= 14) = 0.00091

# with open(dirname + f"hidden_single_list.txt") as f:
#     f_list = f.readlines()
# with open(dirname + f"hidden_single_array.txt") as f:
#     f_array = f.readlines()

# list_responses = np.array([find_last_number(s) for s in f_list if s[:8] == 'Response'])
# array_responses = np.array([find_last_number(s) for s in f_array if s[:8] == 'Response'])
# (list_responses == array_responses).sum()

11

# 4-shot Hidden Single

In [91]:
# mode = 'list'
mode = 'array'
k = 4

df = df_house_types[~df_house_types.puzzle_id.isin(hs_pids)]
df = df.sample(n*k, random_state=0).reset_index(drop=True)
df = df_puzzles[df_puzzles.puzzle_id.isin(df.puzzle_id)]

rows = []
for pid, df_puzzle in tqdm(df.groupby('puzzle_id')):
    puzzle_id = df_puzzle.puzzle_id.values[0]
    array = df_to_array(df_puzzle)
    if mode == 'list':
        question, answer = create_hs_list_prompt(df_puzzle, df_house_types)
    else:
        question, answer = create_hs_array_prompt(df_puzzle, df_house_types)
    rows.append({'puzzle_id': puzzle_id, 'question': question, 'answer': answer})
df_prompts = pd.DataFrame(rows)
df_prompts['problem_id'] = np.repeat(np.arange(len(df_prompts) / k, dtype=int), k)

  0%|          | 0/200 [00:00<?, ?it/s]

In [92]:
rows = []
for target_pid, (problem_id, df) in zip(hs_pids, df_prompts.groupby('problem_id')):
    df_puzzle = df_puzzles[df_puzzles.puzzle_id == target_pid]
    array = df_to_array(df_puzzle)
    if mode == 'list':
        question, answer = create_hs_list_prompt(df_puzzle, df_house_types)
    else:
        question, answer = create_hs_array_prompt(df_puzzle, df_house_types)
    support = '\n\n'.join(df.question + ' ' + df.answer)
    prompt = support + '\n\n' + question
    row = {'problem_id': problem_id, 'prompt': prompt, 'answer': answer, 'pid_target': target_pid}
    for i, puzzle_id in enumerate(df.puzzle_id, 1):
        row[f"pid_{i}"] = puzzle_id
    rows.append(row)
    
df_problems = pd.DataFrame(rows)
df_problems.answer = [find_last_number(a) for a in df_problems.answer]

In [93]:
model = "text-davinci-003"

responses = []
for prompt in tqdm(df_problems.prompt):
    p = "I am a highly intelligent puzzle solving bot.\n\n" + prompt
    response = openai.Completion.create(
      model=model,
      prompt=p,
      temperature=0,
      max_tokens=100,
      top_p=1,
      frequency_penalty=0.0,
      presence_penalty=0.0,
      stop=["\n"]
    )
    responses.append(response['choices'][0]['text'])
df_problems['response'] = responses
df_problems['r_parsed'] = [find_last_number(r) for r in df_problems.response]
df_problems['correct'] = df_problems.answer == df_problems.r_parsed

  0%|          | 0/50 [00:00<?, ?it/s]

In [94]:
dirname = f"../../data/llm/{model}/"
utils.mkdir(dirname)
df_problems.to_csv(dirname + f"hidden_single_k4_{mode}.tsv", sep='\t', index=False)

In [95]:
entries = []
total_correct = df_problems.correct.sum()
for record in df_problems.to_records():
    correct = 'Correct' if record.correct else 'Incorrect'
    entry = [f"problem_id {record.problem_id}: {record.correct}", 
             "Prompt", record.prompt, 
             f"Expected: {record.answer}", 
             f"Response: {record.response}"]
    entry = '\n'.join(entry)
    entries.append(entry)
entries.insert(0, f"Accuracy: {total_correct} of {len(entries)}")
entries = '\n\n'.join(entries)
with open(dirname + f"hidden_single_k4_{mode}.txt", 'w') as f:
    f.write(entries)

In [96]:
df_problems.correct.mean()

0.08

# With CoT - positive only, k=4

In [152]:
def get_positive_explanation(df_puzzle, df_house_types):
    contents = ddict(**{r.key: f"(row {r.row}, column {r.column})" for r in df_puzzle.to_records()})
    house_type = df_puzzle[['subject_id', 'phase', 'trial']].head(1).merge(df_house_types).house_type[0]
    goal_row, goal_column, target = df_puzzle[df_puzzle.key == 'goal'][['row', 'column', 'number']].values[0]
    distractor = df_puzzle[df_puzzle.type == 'distractor'].number.values[0]
    house = f'row {goal_row}' if house_type == 'row' else f'column {goal_column}'
    orthogonal = 'column' if house_type == 'row' else 'row'
    
    answer = f"""
        Between the two numbers that appear 3 times on the grid, 
        {target} and {distractor},
        we arbitrarily choose {target} as a candidate to check whether it can go in the other cells. 
        {target} cannot go in {contents.inhouse0}, {contents.inhouse1}, or {contents.inhouse2}
        because these cells already have numbers in them.
        {contents.emptyBox0}, {contents.emptyBox1}, and {contents.emptyBox2}
        cannot contain {target} because they share a 3x3 box with {target} in {contents.targetBox}.
        {contents.emptySingle} cannot contain {target} because it shares a {orthogonal} with {target} in 
        {contents.targetSingle}, and {contents.emptyDouble} cannot contain {target} 
        because it shares a {orthogonal} with {target} in {contents.targetDouble}.
        Since we have eliminated all other eight cells in {house} as potential locations to place to put the {target},
        we can conclude that {contents.goal} must contain the {target}.
    """
    return ' '.join(answer.split())

In [153]:
def get_negative_explanation(df_puzzle, df_house_types):
    contents = ddict(**{r.key: f"(row {r.row}, column {r.column})" for r in df_puzzle.to_records()})
    house_type = df_puzzle[['subject_id', 'phase', 'trial']].head(1).merge(df_house_types).house_type[0]
    goal_row, goal_column, target = df_puzzle[df_puzzle.key == 'goal'][['row', 'column', 'number']].values[0]
    distractor = df_puzzle[df_puzzle.type == 'distractor'].number.values[0]
    house = f'row {goal_row}' if house_type == 'row' else f'column {goal_column}'
    orthogonal = 'column' if house_type == 'row' else 'row'
    
    answer = f"""
        Between the two numbers that appear 3 times on the grid, 
        {distractor} and {target},
        we arbitrarily choose {distractor} as a candidate to check whether it can go in the other cells. 
        {distractor} cannot go in {contents.inhouse0}, {contents.inhouse1}, or {contents.inhouse2}
        because these cells already have numbers in them.
        {contents.emptyBox0}, {contents.emptyBox1}, and {contents.emptyBox2}
        cannot contain {distractor} because they share a 3x3 box with {distractor} in {contents.distractorBox}.
        {contents.emptyDouble} cannot contain {distractor} because it shares a {orthogonal} with {distractor} in 
        {contents.distractorDouble}. However, there are no numbers that prevent either {contents.goal} or 
        {contents.emptySingle} from containing {distractor}, so we cannot be certain about either.
        Choosing {target} as the new candidate and applying a similar logic, we can see that the only cell in {house} that can contain
        {target} is {contents.goal}, so we can conclude that the answer is {target}.
    """
    return ' '.join(answer.split())

In [154]:
def get_shortcut_explanation(df_puzzle, df_house_types):
    contents = ddict(**{r.key: f"(row {r.row}, column {r.column})" for r in df_puzzle.to_records()})
    house_type = df_puzzle[['subject_id', 'phase', 'trial']].head(1).merge(df_house_types).house_type[0]
    goal_row, goal_column, target = df_puzzle[df_puzzle.key == 'goal'][['row', 'column', 'number']].values[0]
    distractor = df_puzzle[df_puzzle.type == 'distractor'].number.values[0]
    house = f'row {goal_row}' if house_type == 'row' else f'column {goal_column}'
    orthogonal = 'column' if house_type == 'row' else 'row'
    
    answer = f"""
        We check whether the two numbers that appear 3 times on the grid, {target} and {distractor},
        can go in the other cells.
        {contents.emptySingle} shares a {house_type} with {target} but not {distractor},
        so we can conclude that the answer is {target}.
    """
    return ' '.join(answer.split())

In [155]:
def create_hs_explanation_list_prompt(df_puzzle, df_house_types):
    target = df_puzzle[df_puzzle.type == 'target'].number.values[0]
    goal = df_puzzle[df_puzzle.key == 'goal'][['row', 'column']].values[0]
    house_type = df_puzzle[['subject_id', 'phase', 'trial']].head(1).merge(df_house_types).house_type[0]
    house_index = goal[0] if house_type == 'row' else goal[1]
    
    array = df_to_array(df_puzzle)
    question = ["Q: Consider a Sudoku grid with the following numbers:"]
    question += get_list_format(df_puzzle)
    # for r, c, v in df_puzzle[['row', 'column', 'value']].values:
    #     if v > 0:
    #         question.append(f"(row {r}, column {c}) contains {v}")
    question.append(f'What number must (row {goal[0]}, column {goal[1]}) contain?')
    question.append(f'Hint: use the hidden single technique along {house_type} {house_index}.')
    prefix = f"""
        A: To solve for (row {goal[0]}, column {goal[1]}), 
        we need to find the number that cannot go in any other cell in its {house_type}.
    """
    question.append(' '.join(prefix.split()))
    question = '\n'.join(question)
    answer = f'{target}'
    return question, answer

In [156]:
def create_hs_explanation_array_prompt(df_puzzle, df_house_types):
    target = df_puzzle[df_puzzle.type == 'target'].number.values[0]
    goal = df_puzzle[df_puzzle.key == 'goal'][['row', 'column']].values[0]
    house_type = df_puzzle[['subject_id', 'phase', 'trial']].head(1).merge(df_house_types).house_type[0]

    array = df_to_array(df_puzzle)
    question = ["Q: Consider a Sudoku grid with the following numbers:"]
    question.append(draw_grid(array, use_numbers=True))
    question.append(f'What number must X be?')
    question.append(f'Hint: use the hidden single technique along {house_type} that contains X.')
    prefix = f"""
        A: To solve for X in (row {goal[0]}, column {goal[1]}), 
        we need to find the number that cannot go in any other cell in its {house_type}.
    """
    question.append(' '.join(prefix.split()))
    question = '\n'.join(question)
    answer = f'{target}'
    return question, answer

In [157]:
# mode = 'list'
mode = 'array'
explanation_type = 'mix' # positive, mix
k = 4

df = df_house_types[~df_house_types.puzzle_id.isin(hs_pids)]
df = df.sample(n*k, random_state=0).reset_index(drop=True)
df = df_puzzles[df_puzzles.puzzle_id.isin(df.puzzle_id)]

rows = []
for i, (pid, df_puzzle) in enumerate(tqdm(df.groupby('puzzle_id'))):
    puzzle_id = df_puzzle.puzzle_id.values[0]
    array = df_to_array(df_puzzle)
    if mode == 'list':
        question, answer = create_hs_explanation_list_prompt(df_puzzle, df_house_types)
    else:
        question, answer = create_hs_explanation_array_prompt(df_puzzle, df_house_types)
    if explanation_type == 'shortcut':
        explanation = get_shortcut_explanation(df_puzzle, df_house_types)
    elif explanation_type == 'mix' and i%2 == 1:
        explanation = get_negative_explanation(df_puzzle, df_house_types)
    else:
        explanation = get_positive_explanation(df_puzzle, df_house_types)
    rows.append({'puzzle_id': puzzle_id, 'question': question, 'answer': answer, 'explanation': explanation})
df_prompts = pd.DataFrame(rows)
df_prompts['problem_id'] = np.repeat(np.arange(len(df_prompts) / k, dtype=int), k)

  0%|          | 0/200 [00:00<?, ?it/s]

In [158]:
rows = []
for target_pid, (problem_id, df) in zip(hs_pids, df_prompts.groupby('problem_id')):
    df_puzzle = df_puzzles[df_puzzles.puzzle_id == target_pid]
    array = df_to_array(df_puzzle)
    if mode == 'list':
        question, answer = create_hs_explanation_list_prompt(df_puzzle, df_house_types)
    else:
        question, answer = create_hs_explanation_array_prompt(df_puzzle, df_house_types)
    explanation = get_positive_explanation(df_puzzle, df_house_types)
    support = '\n\n'.join(df.question + ' ' + df.explanation)
    prompt = support + '\n\n' + question
    row = {'problem_id': problem_id, 'prompt': prompt, 'explanation': explanation, 'answer': answer, 'pid_target': target_pid}
    for i, puzzle_id in enumerate(df.puzzle_id, 1):
        row[f"pid_{i}"] = puzzle_id
    rows.append(row)
    
df_problems = pd.DataFrame(rows)

In [159]:
print(df_problems.prompt[0])

Q: Consider a Sudoku grid with the following numbers:
[X 0 0 0 0 0 0 0 0]
[0 0 0 0 5 0 0 6 0]
[0 0 0 0 0 0 0 5 0]
[0 0 0 0 0 0 0 0 6]
[0 0 6 0 0 0 0 0 0]
[0 5 0 0 0 0 0 0 0]
[7 0 0 0 0 0 0 0 0]
[9 0 0 0 0 0 0 0 0]
[1 0 0 0 0 0 0 0 0]
What number must X be?
Hint: use the hidden single technique along column that contains X.
A: To solve for X in (row 1, column 1), we need to find the number that cannot go in any other cell in its column. Between the two numbers that appear 3 times on the grid, 5 and 6, we arbitrarily choose 5 as a candidate to check whether it can go in the other cells. 5 cannot go in (row 8, column 1), (row 9, column 1), or (row 7, column 1) because these cells already have numbers in them. (row 4, column 1), (row 5, column 1), and (row 6, column 1) cannot contain 5 because they share a 3x3 box with 5 in (row 6, column 2). (row 3, column 1) cannot contain 5 because it shares a row with 5 in (row 3, column 8), and (row 2, column 1) cannot contain 5 because it shares a ro

In [160]:
model = "text-davinci-003"

responses = []
for prompt in tqdm(df_problems.prompt):
    p = "I am a highly intelligent puzzle solving bot.\n\n" + prompt
    response = openai.Completion.create(
      model=model,
      prompt=p,
      temperature=0,
      max_tokens=256,
      top_p=1,
      frequency_penalty=0.0,
      presence_penalty=0.0,
      stop=["\n"]
    )
    responses.append(response['choices'][0]['text'])
df_problems['response'] = responses
df_problems['r_parsed'] = [find_last_number(r) for r in df_problems.response]
answers = [find_last_number(a) for a in df_problems.answer]
df_problems['correct'] = answers == df_problems.r_parsed
df_problems.correct.sum()

  0%|          | 0/50 [00:00<?, ?it/s]

23

In [161]:
dirname = f"../../data/llm/{model}/"
utils.mkdir(dirname)
df_problems.to_csv(dirname + f"hidden_single_expk4_{mode}_{explanation_type}.tsv", sep='\t', index=False)