In [13]:
model_name = ['llama', 'gpt', 'deepseek'][1]

import getpass
import os
from langchain_ollama import ChatOllama
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from glob import glob
import json
from num2words import num2words
import pandas as pd

In [2]:
llama = ChatOllama(
        model="llama3.2",
        temperature=0,
        num_ctx=4096,
        num_predict=2048,
        verbose=True
    )
model = None
if model_name == 'llama':
    model = llama
elif model_name == 'deepseek':
    model = ChatOllama(
        model="deepseek-r1",
        temperature=0,
        num_ctx=4096,
        num_predict=2048,
        verbose=True
    )
else:
    if not os.environ.get("OPENAI_API_KEY"):
        os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")
    model = ChatOpenAI(
        model="gpt-4o",
        temperature=0,
        max_tokens=None,
        timeout=None,
        max_retries=2,
        # api_key="...",  # if you prefer to pass api key in directly instaed of using env vars
        # base_url="...",
        # organization="...",
        # other params...
    )

In [3]:
files = glob('./selected_questions/*.jsonl')
questions = []
for path in files:
    question_type = path[path.rfind('/')+1:-6]
    with open(path, 'r') as file:
        for i in range(100):
            line = file.readline()
            question = json.loads(line)
            question['type'] = question_type
            questions.append(question)

In [4]:
# system_prompt0 = str("Answer the provided user question. The answer should not exceed twenty words.")


system_prompt1 = str("Answer the provided user question while satisfying the following requirements:\n"
                    "1. do not include any parts of the question in the answer you must provide the answer directly.\n"
                    "2. provide only the property the user is asking for like name of an entity, its location, distance, direction.\n"
                    "3. don't provide information the user didn't ask for.\n"
                    "4. any number must be written as words and rounded to the nearest ten.\n"
                    "5. only use metric units.")


system_prompt2 = str("Given a question and a text answer, parse the text answer to json format."
                     " The location must be provided as a complete address,"
                     " any measurment must be in metric units,"
                     " and directions must be converted to azimuth angle in degress."
                     " Try to match the following schema:"
                     """
                        {
                            "name" string
                            "address": string,
                            "count": integer,
                            "distance": integer,
                            "length": integer,
                            "area": integer,
                            "azimuth_angle": integer,
                            %OTHER_ATT%
                        }
                    If a value is missing don't include it in the output, and don't write any comments.
                    All json blocks must be enclosed with ```json and ```
                     """)

new_prompt = """Given a question and a text answer, parse the text answer to json format. The schema must match the provided schema. If an attribute doesn't have a related value in the answer, you can skip it."""
json_schema = {
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "answer",
  "type": "object",
    "properties": {
      "name": {
        "type": "string",
        "description": "The name of the entity, like name of the restuarant, park, etc."
      },
      "address": {
        "type": "string",
        "description": "The full address of the entity."
      },
      "distance": {
          "type": "number",
          "description": "The distance in meters from the location mentioned in the question to this entity."
      },
      "length": {
          "type": "number",
          "description": "The length in meters of the entity that matches the descirption in the question."
      },
      "area": {
          "type": "number",
          "description": "The area of the entity in meters squared that matches the descirption in the question."
      },
      "azimuth_angle": {
          "type": "number",
          "description": "The azimuth angle at which the entity can be found from the location mentioned in the question."
      },
      "Nickname": {
        "type": "string",
        "description": "The nickname of the entity that matches the question."
      },
      "Architect": {
        "type": "string",
        "description": "The name of the architect of the entity that matches the question."
      },
      "Established": {
        "type": "number",
        "description": "The year the entity that matches the question was established."
      },
      "Director": {
        "type": "string",
        "description": "The of the name of the director of the entity that matches the question."
      },
      "Date opened": {
        "type": "string",
        "description": "The date at which the entity that matches the question was opened."
      },
      "Nearest city": {
        "type": "string",
        "description": "The nearest city to the entity that matches the question."
      },
      "Opened": {
        "type": "number",
        "description": "The year at which the entity that matches the question was opened."
      },
      "Motto": {
        "type": "string",
        "description": "The motto of the entity that matches the question."
      },
      "Designed by": {
        "type": "string",
        "description": "The name of the designer of the entity that matches the question."
      },
      "Affiliated university": {
        "type": "string",
        "description": "The name of a university affiliated with the entity that matches the question."
      },
      "Former names": {
        "type": "string",
        "description": "A former name of the entity that matches the question."
      },
      "Helipad": {
        "type": "string",
        "description": "Description about weather the the entity that matches the question has a helipad."
      },
      "Capacity": {
        "type": "number",
        "description": "The capacity of the entity that matches the question."
      },
      "Opening date": {
        "type": "string",
        "description": "The opening date of the entity that matches the question."
      },
      "Built": {
        "type": "number",
        "description": "The year the entity that matches the question was built."
      },
      "Mascot": {
        "type": "string",
        "description": "The name of the mascot of the entity that matches the question."
      }
    }
}

In [25]:
from pydantic import BaseModel, ValidationError
from typing import List, Optional


class Answer(BaseModel):
    name: Optional[str]
    address: Optional[str]
    distance: Optional[float] = None
    length: Optional[float] = None
    area: Optional[float] = None
    count: Optional[int] = None
    azimuth_angle: Optional[float] = None
    nickname: Optional[str] = None
    architect: Optional[str] = None
    established: Optional[int] = None
    director: Optional[str] = None
    date_opened: Optional[str] = None
    nearest_city: Optional[str] = None
    opened: Optional[int] = None
    motto: Optional[str] = None
    designed_by: Optional[str] = None
    affiliated_university: Optional[str] = None
    former_names: Optional[str] = None
    helipad: Optional[str] = None
    capacity: Optional[float] = None
    opening_date: Optional[str] = None
    built: Optional[int] = None
    mascot: Optional[str] = None


class Answers(BaseModel):
    answers: List[Answer]

In [8]:
messages = [
            SystemMessage(content=system_prompt1),
            HumanMessage(content=questions[1454]['question'])
        ]
a = model.invoke(messages).content

In [26]:
with open('./answers_%s.json' % model_name, 'r') as file:
    _answers = json.loads(file.read())
    _answers = {a['id']: a for a in _answers}

In [27]:
from tqdm import tqdm
answers = []
for q in tqdm(questions):
    if q['id'] in _answers:
        answers.append(_answers[q['id']])
    else:
        messages = [
                SystemMessage(content=system_prompt1),
                HumanMessage(content=q['question'])
            ]
        answers.append({'content': model.invoke(messages).content, 'id': q['id']})

100%|██████████| 2800/2800 [00:00<00:00, 2758114.42it/s]


In [25]:
# with open('./answers_%s.json' % model_name, 'w') as file:
#     file.write(json.dumps(answers, indent=2))
with open('./answers_%s.json' % model_name, 'r') as file:
    answers = json.loads(file.read())

In [29]:
with open('./json_answers_%s.json' % model_name, 'r') as file:
        _json_answers = json.loads(file.read())
        _json_answers = {a['id']: a for a in _json_answers}

In [30]:
json_answers = []
for i in tqdm(range(len(questions))):
    q = questions[i]
    if q['id'] in _json_answers:
        json_answers.append(_json_answers[q['id']])
    else:
        if 'multihop1' in q['type']:
            sys_prompt= system_prompt2.replace('%OTHER_ATT%', '"%s": string' % q['answers'][0]['multihop_attribute'])
        else:
            sys_prompt= system_prompt2.replace('%OTHER_ATT%', '')
        a = answers[i]
        if a['id'] != q['id']:
            for j in range(len(answers)):
                if answers[j]['id'] == q['id']:
                    a = answers[j]
                    break
        messages = [
                SystemMessage(content=sys_prompt),
                HumanMessage(content="Question: %s\nAnswer: %s" % (q['question'], a['content']))
            ]
        json_answers.append({'content': llama.invoke(messages).content, 'id': q['id']})

100%|██████████| 2800/2800 [00:00<00:00, 3029940.97it/s]


In [14]:
# with open('./json_answers_%s.json' % model_name, 'w') as file:
#         file.write(json.dumps(json_answers, indent=2))
with open('./json_answers_%s.json' % model_name, 'r') as file:
        json_answers = json.loads(file.read())

In [6]:
import re

def flatten_if_nested(array):
    # Check if the input is a list and contains nested lists
    if isinstance(array, list) and any(isinstance(item, list) for item in array):
        flattened = []
        for item in array:
            if isinstance(item, list):
                flattened.extend(flatten_if_nested(item))
            else:
                flattened.append(item)
        return flattened
    else:
        return array  # Return the input as-is if it's not a list or doesn't contain nested lists

def extract_json_blocks(text, i):
    # Regular expression pattern to match JSON blocks
    pattern = r'```[\s]*json(.*?)```'
    pattern1 = r'\b\d+(?:_\d+)*\b'
    pattern2 = r'\b\d+(?:,\d+)*\b'
    pattern3 = r'//.*?\n'
    pattern4 = r',\s*}'
    pattern5 = r'}\s*{'
    # Find all JSON blocks
    matches = re.findall(pattern, text, re.DOTALL)
    
    # Parse each match to ensure valid JSON
    json_blocks = []
    for match in matches:
        try:
            # Remove any leading/trailing whitespace and parse as JSON
            s = match.strip()
            s = re.sub(pattern1, lambda x: x.group().replace('_', ''), s)
            s = re.sub(pattern2, lambda x: x.group().replace(',', ''), s)
            s = re.sub(pattern3, '', s)
            s = re.sub(pattern4, '}', s)
            s = s.replace('''\\\'''', '''\'''').replace('''\\&''', '''&''').replace('}\njson', '}')
            if re.search(pattern5, s):
                s = re.sub(pattern5, '},\n{', s)
                s = '[\n%s\n]' % s
            convert_area = False
            if 'acres' in s:
                convert_area = True
                s = s.replace(' acres,', ',')
            json_data = json.loads(s)
            if convert_area and 'area' in json_data:
                json_data['area'] = json_data['area'] * 4046.8564224
            json_blocks.append(json_data)
        except json.JSONDecodeError as w:
            print(w)
            # If parsing fails, print an error message (can log or handle as needed)
            print(i)
            print(s)
            print("Warning: Found an invalid JSON block.") 
    return flatten_if_nested(json_blocks)


In [15]:
parsed_answers = []
for i in range(len(questions)):
    q = questions[i]
    a = json_answers[i]
    if a['id'] != q['id']:
        for j in range(len(json_answers)):
            if json_answers[j]['id'] == q['id']:
                a = json_answers[j]
                break
    parsed_answers.append(extract_json_blocks(a['content'], i))

In [8]:
from geopy.geocoders import Nominatim
from pyproj import Geod

geod = Geod(ellps='WGS84')
geocoder = Nominatim(user_agent="Geocoder")

In [9]:
import importlib
import evaluate
importlib.reload(evaluate)
import numpy as np

[nltk_data] Downloading package punkt to /Users/majid/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /Users/majid/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /Users/majid/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /Users/majid/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [21]:

text_evaluation = []

# evaluate text answers

for i in range(len(questions)):
    q = questions[i]
    a = answers[i]
    if a['id'] != q['id']:
        for j in range(len(answers)):
            if answers[j]['id'] == q['id']:
                a = answers[j]
                break
    text_answer = a['content']
    key = ''
    if 'multihop1' in q['type']:
        key = 'multihop_long_answer'
    elif 'name' in q['type']:        
        key = 'name'
    elif 'loc' in q['type']:
        key = 'address'
    elif 'angle' in q['type']:
        key = 'angle_description'
    elif 'area' in q['type']:
        key = 'area'
    elif 'length' in q['type']:
        key = 'length'
    elif 'count' in q['type']:
        key = 'count'
    elif 'distance' in q['type']:
        key = 'distance'
    true_answer = []
    for a in q['answers']:
        v = evaluate.get_osm_value(a, key)
        if v == None:
            continue
        if key in ['area', 'length', 'count', 'distance']:
            v = num2words(v)
        if 'area' == key:
            v += ' meters squared'
        elif key in ['length', 'distance']:
            v += ' meters'
        true_answer.append(v)
    if len(text_answer):
        true_answer = '\n'.join(true_answer)
        P, R, F1 = evaluate.evaluate_entity_names(text_answer, true_answer)
        text_evaluation.append({'attempted': True, 'P': P, 'R': R, 'F1': F1})
    else:
        text_evaluation.append({'attempted': False, 'P': 0, 'R': 0, 'F1': 0})
    

In [22]:
df = pd.DataFrame(text_evaluation)
df['type'] = [q['type'] for q in questions]
df['id'] = [q['id'] for q in questions]
df.to_csv(f'./{model_name}_text_eval.csv', index=False)

In [28]:
for i in range(len(questions)):
    q = questions[i]
    if q['id'] != 1438:
        continue
    a = answers[i]
    p = parsed_answers[i]
    if a['id'] != q['id']:
        for j in range(len(answers)):
            if answers[j]['id'] == q['id']:
                a = answers[j]
                p = parsed_answers[j]
                break
    if q['id'] == 1438:
        break
print(q['question'])
print(a)
print(p)

What's the heading to the closest university from Santa Barbara Museum of Natural History Sea Center, Santa Barbara, CA?
{'content': 'Northwest.', 'id': 1438}
[{'name': 'Santa Barbara Museum of Natural History Sea Center', 'address': '100 N Harbor Way, Santa Barbara, CA 93109', 'distance': None, 'length': None, 'area': None, 'azimuth_angle': 315}]


In [38]:
from evaluate import normalize_text
v = evaluate.get_osm_value(q['answers'][0], 'angle_description')

v, a['content'], normalize_text(a['content']), normalize_text(v), evaluate.evaluate_entity_names(a['content'], v)

('northwest', 'Northwest.', ['Northwest'], ['northwest'], (0, 0, 0))

In [19]:
json_answers[i]

{'content': '```json\n{\n  "name": "University of California, Santa Barbara",\n  "address": "Santa Barbara, CA",\n  "distance": null,\n  "length": null,\n  "area": null,\n  "azimuth_angle": null\n}\n```\n\nNote: The answer does not provide enough information to determine the distance, length, area, or azimuth angle.',
 'id': 1438}

In [10]:
import tqdm

In [16]:
# evaluate parsed_answers
parsed_evaluation = []
progress = tqdm.tqdm(range(len(questions)))
def imporved_f1(new_f1, scores):
    return ('F1' not in scores) or (new_f1 > scores['F1'])

for i in progress:
    q = questions[i]
    parsed_answer = parsed_answers[i]
    scores = {'attempted': False}
    if 'multihop1' in q['type']:
        for a in q['answers']:
            v = evaluate.get_osm_value(a, 'multihop_answer')
            if v == None:
                continue
            for p in parsed_answer:
                pred_answer = p.get(a['multihop_attribute'], None)
                if pred_answer == None or len(pred_answer) == 0:
                    continue
                P, R, F1 = evaluate.evaluate_entity_names(pred_answer, v)
                if imporved_f1(F1, scores):
                    scores = {'attempted': True, 'P': P, 'R': R, 'F1': F1}
    elif 'name' in q['type']:
        for a in q['answers']:
            v = evaluate.get_osm_value(a, 'name')
            if v == None:
                continue
            for p in parsed_answer:
                pred_answer = p.get('name', None)
                if pred_answer == None or len(pred_answer) == 0:
                    continue
                P, R, F1 = evaluate.evaluate_entity_names(pred_answer, v)
                if imporved_f1(F1, scores):
                    scores = { 'attempted': True, 'P': P, 'R': R, 'F1': F1}
    elif 'loc' in q['type']:
        for a in q['answers']:
            v = evaluate.get_osm_value(a, 'address')
            loc = evaluate.get_osm_value(a, 'location')
            if v == None:
                continue
            for p in parsed_answer:
                pred_answer = p.get('address', None)
                if pred_answer == None or len(pred_answer) == 0:
                    continue
                if type(pred_answer) == type([]):
                    pred_answer = ', '.join(pred_answer)
                P, R, F1 = evaluate.evaluate_entity_names(pred_answer, v)
                if imporved_f1(F1, scores):
                    scores.update({'attempted': True,'P': P, 'R': R, 'F1': F1})
                pred_loc = evaluate.get_location_by_address(geocoder, pred_answer)
                if pred_loc == None:
                    continue
                distance_error = evaluate.evaluate_location(geod, [pred_loc], [loc])[0]
                distance_limit = 5*10**5
                if distance_error > distance_limit:
                    distance_error = 1.0
                else:
                    distance_error /= distance_limit
                if distance_error < scores.get('distance_error', float('inf')):
                    scores['distance_error'] = distance_error
    elif 'angle' in q['type']:
        for a in q['answers']:
            angle = evaluate.get_osm_value(a, 'angle')
            angle_desc = evaluate.get_osm_value(a, 'angle_description')
            if angle == None:
                continue
            for p in parsed_answer:
                pred_angle = p.get('azimuth_angle', None)
                try:
                    pred_angle = int(pred_angle)
                except:
                    continue
                pred_answer = evaluate.get_angle_desc(pred_angle)
                if pred_answer == None or len(pred_answer) == 0:
                    continue
                P, R, F1 = evaluate.evaluate_entity_names(pred_answer, angle_desc)
                if imporved_f1(F1, scores):
                    scores.update({'attempted': True,'P': P, 'R': R, 'F1': F1})
                angle_error = evaluate.evaluate_angle([pred_angle], [angle])[0]
                if angle_error < scores.get('angle_error', float('inf')):
                    scores['angle_error'] = angle_error
    elif 'area' in q['type']:
        for a in q['answers']:
            v = evaluate.get_osm_value(a, 'area')
            if v == None:
                continue
            for p in parsed_answer:
                pred_v = p.get('area', None)
                if pred_v == None:
                    continue
                try:
                    pred_v = int(pred_v)
                except:
                    continue
                relative_error = evaluate.evaluate_measurement(pred_v, v)
                if relative_error < scores.get('relative_error', float('inf')):
                    scores['relative_error'] = relative_error
                    scores['attempted'] = True
    elif 'length' in q['type']:
        for a in q['answers']:
            v = evaluate.get_osm_value(a, 'length')
            if v == None:
                continue
            for p in parsed_answer:
                pred_v = p.get('length', None)
                if pred_v == None:
                    continue
                try:
                    pred_v = int(pred_v)
                except:
                    continue
                relative_error = evaluate.evaluate_measurement(pred_v, v)
                if relative_error < scores.get('relative_error', float('inf')):
                    scores['relative_error'] = relative_error
                    scores['attempted'] = True
    elif 'count' in q['type']:
        for a in q['answers']:
            v = evaluate.get_osm_value(a, 'count')
            if v == None:
                continue
            for p in parsed_answer:
                pred_v = p.get('count', None)
                if pred_v == None:
                    continue
                try:
                    pred_v = int(pred_v)
                except:
                    continue
                relative_error = evaluate.evaluate_measurement(pred_v, v)
                if relative_error < scores.get('relative_error', float('inf')):
                    scores['relative_error'] = relative_error
                    scores['attempted'] = True
    elif 'distance' in q['type']:
        for a in q['answers']:
            v = evaluate.get_osm_value(a, 'distance')
            if v == None:
                continue
            for p in parsed_answer:
                pred_v = p.get('distance', None)
                if pred_v == None:
                    continue
                try:
                    pred_v = int(pred_v)
                except:
                    continue
                relative_error = evaluate.evaluate_measurement(pred_v, v)
                if relative_error < scores.get('relative_error', float('inf')):
                    scores['relative_error'] = relative_error
                    scores['attempted'] = True
    parsed_evaluation.append(scores)

100%|██████████| 2800/2800 [00:02<00:00, 1075.48it/s]


In [17]:
df = pd.DataFrame(parsed_evaluation)
df['type'] = [q['type'] for q in questions]
df['id'] = [q['id'] for q in questions]
df.loc[df['P'].isna(), 'P'] = 0
df.loc[df['R'].isna(), 'R'] = 0
df.loc[df['F1'].isna(), 'F1'] = 0
df.loc[df['distance_error'].isna(), 'distance_error'] = 1.0
df.loc[df['angle_error'].isna(), 'angle_error'] = 1.0
df.loc[df['relative_error'].isna(), 'relative_error'] = 1.0

df.to_csv(f'./{model_name}_parsed_eval.csv', index=False)