Copyright &copy; 2024 Praneeth Vadlapati

In [None]:
import os
from common_functions import get_lm_response, extract_data, print_error, \
								print_progress, model_small, backticks

processing_criteria = 'Provide name and age of people whose age is below 35.'
initial_prompt_template = 'Here is the input data: {structured_input_data}.\n' + processing_criteria
data_format = 'csv'  # 'csv' or 'json'

structured_input_data = ''' ```csv
Name,Gender,Age,City
John,Male,25,NYC
Jane,Female,30,LA
Doe,Male,38,Chicago
Emily,Female,48,Houston
Henry,Male,66,Philadelphia
``` '''.strip()
expected_response = ''' ```csv
Name,Age
John,25
Jane,30
``` '''.strip()
example_response = ''' ```csv
Name,Age
Alice,22
Bob,30
``` '''.strip()

structured_input_data_json = ''' ```json
[
	{"Name": "John", "Gender": "Male", "Age": 25, "City": "NYC"},
	{"Name": "Jane", "Gender": "Female", "Age": 30, "City": "LA"},
	{"Name": "Doe", "Gender": "Male", "Age": 38, "City": "Chicago"},
	{"Name": "Emily", "Gender": "Female", "Age": 48, "City": "Houston"},
	{"Name": "Henry", "Gender": "Male", "Age": 66, "City": "Philadelphia"}
]
``` '''.strip()
expected_response_json = ''' ```json
[
	{"Name": "John", "Age": 25},
	{"Name": "Jane", "Age": 30}
]
``` '''.strip()
example_response_json = ''' ```json
[
	{"Name": "Alice", "Age": 22},
	{"Name": "Bob", "Age": 30}
]
``` '''.strip()

if data_format == 'json':
	structured_input_data = structured_input_data_json
	expected_response = expected_response_json
	example_response = example_response_json


initial_prompt = initial_prompt_template.format(
    structured_input_data=structured_input_data, example_response=example_response)

expected_data = extract_data(expected_response, data_format)
response_length_limit = int(len(expected_response) * 10)

def shorten_response(current_response):
	if current_response and len(current_response) > response_length_limit:
		current_response = current_response[:response_length_limit] + '...'
	return current_response

current_template_file = f'{data_format} prompt_template_optimized.txt'
current_prompt_template = None
if os.path.exists(current_template_file):
	with open(current_template_file) as file:
		current_prompt_template = file.read().strip()

prompt_loaded = True if current_prompt_template else False

if not current_prompt_template:
	current_prompt_template = str(initial_prompt_template)  # copy initial string
current_prompt = current_prompt_template.format(
    structured_input_data=structured_input_data, example_response=example_response)

shortened_prompt_template = None
shortened_template_file = f'{data_format} prompt_template_shortened.txt'
if os.path.exists(shortened_template_file):
	with open(shortened_template_file) as f:
		shortened_prompt_template = f.read().strip()

loaded_short_prompt = True if shortened_prompt_template else False

shortened_prompt = shortened_prompt_template.format(
	structured_input_data=structured_input_data, example_response=example_response) \
	if shortened_prompt_template else None

print(f'Model: {model_small}')

In [None]:
prompteng_log_file = 'test_prompt_eng.log'
with open(prompteng_log_file, 'w') as f:
	f.write('')

trial_log_file = 'trial.log'
with open(trial_log_file, 'w') as f:
	f.write('')

trial_log_file2 = trial_log_file + '2'
with open(trial_log_file2, 'w') as f:
	f.write('')

last_wrong_response = None

def find_accuracy(current_prompt, model=None, no_logs=False, trial_mode=False, total_trials=7):
	global last_wrong_response, current_response
	correct_responses = 0

	if not no_logs:
		print(' Finding accuracy', end='', flush=True)
		with open(trial_log_file, 'a') as f:
			f.write(f'\n\n' + '='*40 + f' Checking {total_trials} times ' + '='*40 + '\n\n')

	for trial_attempt_num in range(total_trials):
		current_response = get_lm_response(current_prompt, model=model)
		try:
			if extract_data(current_response, data_format) == expected_data:
				correct_responses += 1
				with open(trial_log_file2, 'a') as f:
					f.write(f'Response: {current_response}\n\n')
					f.write('_'*120 + '\n')
			else:
				last_wrong_response = current_response
				if not no_logs:
					with open(trial_log_file, 'a') as f:
						f.write(f'Response: {current_response}\n\n')
						f.write('_'*120 + '\n')
			if not no_logs:
				print_progress()
		except Exception as e:
			if not no_logs:
				with open(trial_log_file, 'a') as f:
					f.write(f'Error: {e}\n\n')
					f.write('_'*120 + '\n')
			print_error(e)
		if trial_mode and not correct_responses and trial_attempt_num >= (0.4*total_trials):
			break

	if not no_logs:
		print()  # because end='' was used in the last print statement
	return correct_responses / total_trials  # trial_accuracy

print('Setup done.')

## Optimize the prompt

In [None]:
attempt_num = 0
prompt_eng_success = False

attempts_limit = 4
trial_cutoff_score = 0.8

trial_accuracy = None
initial_accuracy = None

prompt_eng_template = '''
Value of structured_input_data variable (includes backticks):
{structured_input_data}
\n --- 
Processing criteria: {processing_criteria}
\n --- 
Current prompt template: {current_prompt_template}
\n --- 
Accuracy of current prompt: {trial_accuracy}
Current response: {current_response}
\n --- 
Expected response: {expected_response}
\n --- 

Act as a Prompt Engineer and an expert Linguist. Write prompt to process structured data using language model.
Rewrite prompt template to generate expected response (including its special characters and backticks).
Use example_response placeholder to indicate a sample response. Don't add your own sample in the template.

Don't include answer or expected response.
CSV responses must include expected column names without extra columns.
Write only new prompt template without any other text.
At last, emphasize on the processing criteria I mentioned.
Backticks and format must be exactly same as the example response.

Avoid backticks like ```.
Mention "[structured_input_data]" placeholder to indicate input data.
Make sure response includes backticks.
Don't miss both placeholders mentioned in curly braces.
Add "**Write like**: [example_response]" as placeholder to mention example response.
'''.strip()

def optimize_prompt():
	global current_prompt_template, current_prompt, trial_accuracy, attempt_num, \
		current_response, prompt_eng_success, initial_accuracy
	if prompt_loaded:
		prompt_eng_success = True
		print(f'\nCurrent prompt template was already loaded. New prompt is not required.')
		trial_accuracy = trial_cutoff_score
		return  # temporarily avoid running

	while True:
		# -------------- Find accuracy of the current prompt template --------------
		trial_accuracy = find_accuracy(current_prompt)
		if current_prompt_template == initial_prompt_template:
			initial_accuracy = trial_accuracy
		print(f'Trial accuracy: {trial_accuracy}')
		if trial_accuracy >= trial_cutoff_score:
			print(f'Trial accuracy: {trial_accuracy} (Good).')
			prompt_eng_success = True
		else:
			print(f'Trial accuracy: {trial_accuracy} (Failed).')
			current_response = last_wrong_response
			if attempt_num > attempts_limit:
				print(f'Prompt Optimization failed after {attempt_num} attempts. Exiting.')
				break
			if attempt_num:  # non-zero
				print(f'Failed attempt {attempt_num}')

		# -------------- Save current prompt template --------------
		if prompt_eng_success:
			with open(current_template_file, 'w') as f:
				f.write(current_prompt_template)
			if attempt_num <= 2:  # first few attempts
				if trial_accuracy == 1:  # stop if accuracy 100%
					print(f'Accuracy is {trial_accuracy} (BEST). Exiting.')
					break
				# if prompt success but <100% accuracy at first attempt, proceed to improve the prompt
			else:  # in next attempts, stop if accuracy is good
				break
		else:
			attempt_num += 1
			with open(prompteng_log_file, 'a') as f:
				f.write(f'Prompt: {current_prompt_template}\n\n')
				f.write('-'*80 + '\n')
				f.write(f'Last Wrong Response: {shorten_response(current_response)}\n\n')
				f.write('_'*120 + '\n')

		# -------------- Change prompt using LLM --------------
		prompteng_attempts_limit = 3
		prompt_eng_attempt = 0
		while prompt_eng_attempt < prompteng_attempts_limit:
			prompt_eng_attempt += 1
			prompteng_prompt = prompt_eng_template.format(
				structured_input_data=structured_input_data,
				example_response=example_response,
				processing_criteria=processing_criteria,
				current_prompt_template=current_prompt_template,
				current_response=current_response,
				expected_response=expected_response,
				trial_accuracy=int(trial_accuracy * 100),
			).replace('[', '{').replace(']', '}')
			print(f'Calling LLM for prompt engineering attempt {prompt_eng_attempt}...')
			current_prompt_template = get_lm_response(prompteng_prompt, use_large_model=True)
			# if prompt has more than 250 words, try again
			if len(current_prompt_template.split()) > 500:
				print('Prompt is too long. Trying again.')
				current_prompt_template = None
				continue
			try:
				# -------------- Test the new prompt --------------
				if 'structured_input_data' not in current_prompt_template or \
						'example_response' not in current_prompt_template:
					print('Prompt template has missing placeholders. Trying again.')
					prompt_eng_attempt -= 1
					continue
				if backticks in current_prompt_template:
					print('Prompt template has backticks. Trying again.')
					prompt_eng_attempt -= 1
					continue
				current_prompt = current_prompt_template.format(
					structured_input_data=structured_input_data, example_response=example_response)
				break  # prompt creation successful
			except:
				print_error()
				continue
		if prompt_eng_attempt >= prompteng_attempts_limit:
			print('Prompt engineering failed. Exiting.')
			break

optimize_prompt()

In [None]:
print(current_prompt_template)

## Shorten the prompt, maintaining the accuracy

In [5]:
def shorten_prompt_using_function(shortener_function, max_attempts=5):
	global current_prompt_template, trial_accuracy, shortened_prompt_template, shortened_prompt
	if loaded_short_prompt:
		print('Shortened prompt template is already loaded from file. Exiting.')
		return
	if not prompt_eng_success:
		if not trial_accuracy:  # Trial conducted and failed
			print('Prompt engineering failed. Could not send the prompt for shortening.')
		return

	shorten_attempt_num = 0
	while shorten_attempt_num < max_attempts:
		shorten_attempt_num += 1
		shortened_prompt_template = shortener_function(str(current_prompt_template))  # pass a copy
		if len(shortened_prompt_template) > len(current_prompt_template):
			print('Prompt became longer. Retrying.')
			continue
		try:
			if 'structured_input_data' not in shortened_prompt_template or \
					'example_response' not in shortened_prompt_template:
				print('Shortened prompt template has missing placeholders. Retrying.')
				continue
			shortened_prompt = shortened_prompt_template.format(
       			structured_input_data=structured_input_data, example_response=example_response)
		except:
			print('Shortened prompt failed to format. Retrying.')
			continue

		short_trial_accuracy = find_accuracy(shortened_prompt, trial_mode=True)
		if short_trial_accuracy < trial_accuracy:
			print(f'Shortened prompt accuracy is {short_trial_accuracy}, not improved. Retrying...')
			continue
		else:
			print(f'Shortened prompt accuracy: {short_trial_accuracy}')

		old_length = len(current_prompt_template)
		reduction_percent = ((old_length - len(shortened_prompt_template)) / old_length) * 100
		if reduction_percent <= 0:
			print('Length of Shortened prompt is same as original prompt.')
			if short_trial_accuracy <= trial_accuracy:  # same length without improvement in accuracy
				continue
			print('\t But it is more accurate.')  # proceed with next steps

		current_prompt_template = shortened_prompt_template
		trial_accuracy = short_trial_accuracy
		with open(shortened_template_file, 'w') as f:
			f.write(shortened_prompt_template)
			
		print(f'\n SUCCESS: Shortened prompt is {reduction_percent:.2f}% shorter. Saved prompt.')
		if reduction_percent <= 0:
			print('Accuracy improved, but length not reduced. Retrying to shorten the prompt further...')
			shorten_attempt_num -= 1
			continue

		break  # shortened by maintaining accuracy

	if shorten_attempt_num >= max_attempts:
		print(f'Failed to shorten the prompt after {max_attempts} attempts. Exited.')

		shortened_prompt_template = current_prompt_template
		with open(shortened_template_file, 'w') as f:
			f.write(shortened_prompt_template)

		shortened_prompt = shortened_prompt_template.format(
			structured_input_data=structured_input_data, example_response=example_response)


shorten_instruction = '''
Current prompt template:
{current_prompt_template}
\n --- 
Be a Prompt Engineer. Shorten the above prompt template.
Make sure the prompt is short and concise.
Retain the placeholders values and key information.
Do not remove placeholders 'structured_input_data' and 'example_response'.
Return only the shortened prompt without any other text.
When a language model uses the prompt to generate response, backticks and format must be exactly same as the example response.
Make sure response from the model includes the formatted data inside backticks with the format like {example_response}
'''.strip()

def shorten_prompt_LLM(template):
	return get_lm_response(
		shorten_instruction.format(current_prompt_template=template,
									example_response=example_response),
		use_large_model=True
	)

shorten_prompt_using_function(shorten_prompt_LLM)

## Testing final prompt using multiple models

In [None]:
models_to_test = []
if model_small not in models_to_test:
	models_to_test.append(model_small)

if not prompt_eng_success:
    print('Prompt engineering failed. Could not send the prompt for final testing.')
else:
	for model in models_to_test:
		print(f'\nUsing model {model} (type: {data_format})...')
		model_trial_accuracy = find_accuracy(current_prompt, model=model, total_trials=10)
		print(model_trial_accuracy)
		if shortened_prompt:
			model_shortened_accuracy = find_accuracy(shortened_prompt, model=model, total_trials=10) \
										if current_prompt != shortened_prompt else model_trial_accuracy
			shortened_prompt_text = f'& {model_shortened_accuracy} (shortened prompt) '
			print(model_shortened_accuracy)
		else:
			shortened_prompt_text = ''
		model_initial_accuracy = find_accuracy(initial_prompt, model=model, total_trials=10) \
									if initial_accuracy == None or model != model_small else initial_accuracy
		print(model_initial_accuracy)
		val = f'{model}: ({data_format}) {model_trial_accuracy} (optimized prompt) {shortened_prompt_text}& {model_initial_accuracy} (initial prompt)\n'
		with open('results.txt', 'a') as f:
			f.write(val)