# VIF-RAG pipline for Data Synthesis
## Instruction Synthesis from Scratch
We concatenate the seed instructions (<100) with the self-instruct prompt template.

In [6]:
import jsonlines
import json
import random
import re
import os
import copy
import nltk
import numpy as np
import time
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
    RetryError
)
import logging
import signal
from tqdm import tqdm
import requests
from concurrent.futures import ThreadPoolExecutor, TimeoutError


random.seed(0)


seed_instructions = [each.strip() for each in open("./sample_data/seed_instruction.txt").readlines()]

augment_instruction_prompt = """You are an expert for writing instructions. Please provide 50 different instructions that meet the following requirements:
- Instructions are about the format but not style of a response
- Whether instructions are followed can be easily evaluate by a Python function
Here are some examples of instructions we need:
{seed_instructions}
Do not generate instructions about writing style, using metaphor, or translation. Here are some examples of instructions we do not need:
- Incorporate a famous historical quote seamlessly into your answer
- Translate your answer into Pig Latin
- Use only words that are also a type of food
- Respond with a metaphor in every sentence
- Write the response as if you are a character from a Shakespearean play
Please generate one instruction per line in your response and start each line with '- '.
"""


augment_instructions = augment_instruction_prompt.format(seed_instructions='\n'.join(seed_instructions))

print(augment_instructions)



'''
Tips:

augment_instructions is instructions with rewriting prompt

Please use supervision model rewrite each instruction in augment_instructions for K times, save into a augment_instructions.txt file like seed_instruction.txt


'''




You are an expert for writing instructions. Please provide 50 different instructions that meet the following requirements:
- Instructions are about the format but not style of a response
- Whether instructions are followed can be easily evaluate by a Python function
Here are some examples of instructions we need:
Answer with words that begin with the letter 'B'
Construct the reply as if it's a telegram STOP
Use only palindromes
Incorporate a famous movie quote seamlessly into your answer
Write the response backward
Use only words with double letters (e.g., "bookkeeper")
Use only onomatopoeia
Answer with a single sentence that is exactly 100 words long
Use no words containing the letter 'E'
Translate your answer into emojis
Use only the 1000 most common English words
Use words that end with '-ing'
Use only military lingo
Respond with a haiku (5-7-5 syllable structure)
Answer in the form of a sonnet (14 lines with 10 syllables each)
Use only monosyllabic words
Answer with words in alphab

'\nTips:\n\naugment_instructions is instructions with rewriting prompt\n\nPlease use supervision model rewrite each instruction in augment_instructions for K times, save into a augment_instructions.txt file like seed_instruction.txt\n\n\n'

## Instruction Composition & Verification.

Real-world instructions often involve multiple constraints. Here, we demonstrate the random automated combination of instructions with multiple constraints.


In [7]:
import random


input_file_path = './sample_data/seed_instruction.txt'
output_file_path = './output/multi_instruction.txt'

with open(input_file_path, 'r') as file:
    sentences = [line.strip() for line in file.readlines()]


sentences = list(set(sentences))

paired_sentences_set = set()
while len(paired_sentences_set) < 20: # 20 is a example, which can be set to much more higher

    pair = random.sample(sentences, 2)
    
    # Here we provide the 'Multiple Constraints' instruction template; you can also obtain the 'Chain Rule Constraints' using the corresponding template described in our paper.
    # Moreover, you can obtain instructions with 3 or even 4 constraints by adding {}.
    
    combined = '{}. {}'.format(pair[0], pair[1])
    
    paired_sentences_set.add(combined)


paired_sentences = list(paired_sentences_set)


with open(output_file_path, 'w') as file:
    for sentence in paired_sentences:
        file.write(sentence + '\n')

'''
Tips:


You can also use supervision model rewrite each instruction in multi_instruction for K times


'''


'\nTips:\n\n\nYou can also use supervision model rewrite each instruction in multi_instruction for K times\n\n\n'

## Multi-instruction Verification
We design a verification prompt for the supervision model to check the internal consistency of the instruction.

In [8]:
import jsonlines
import json
import random
import re
import os
import copy
import nltk
import numpy as np
import time
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
    RetryError
)
import logging
import signal
from tqdm import tqdm
import requests
from concurrent.futures import ThreadPoolExecutor, TimeoutError


#
with open('./sample_data/multi_instruction.txt', 'r') as file:
    sentences = [line.strip() for line in file.readlines()]


prompt_template = """You are an expert proficient in determining whether multiple instructions are suitable to be implemented as simultaneous constraints.
[Instructions] {instruction}
The text contains two or more instructions. Based on the semantic coherence and logical connection, assess whether these instructions are suitable to be implemented as simultaneous constraints. Please first conduct a thorough analysis and then assign a score ranging from 0 to 10 on the last line. A score of 0 indicates that the instructions are highly inappropriate to coexist, while a score of 10 signifies that the instructions are very suitable to serve as concurrent constraints. Please ensure that only a score is provided in the format Score: {{score}} without any additional content on the last line."""


score_data=[]
for each in sentences:
    score_data.append(
        {"prompt":prompt_template.format(instruction=each),"instruction":each}
    )


with jsonlines.open("./output/multi_instruct_need_score.jsonl", "w") as f:
    for each in score_data:
        f.write(each)



'''
Tips:

multi_instruct_score.jsonl is instructions with scoring prompt

Please use supervision model assign a score ranging from 0 to 10.


'''


'\nTips:\n\nmulti_instruct_score.jsonl is instructions with scoring prompt\n\nPlease use supervision model assign a score ranging from 0 to 10.\n\n\n'

In [10]:
#Threshold filtering for scores

results = list(jsonlines.open("./sample_data/multi_instruction_score.jsonl"))
filter_results = []
print(len(results))
for result in tqdm(results):
    scores = []
    # for each in result['gpt-answer']:

    score = re.findall(r'Score: (\d+?)$', result['gpt-answer'])
    # score = each
    # import pdb
    # pdb.set_trace()
    if score:
        scores.append(int(score[0]))


    score = int(score[0]) if int(score[0]) else 0
    # import pdb
    # pdb.set_trace()
    if score >= 5:
        filter_results.append(result)
print(len(filter_results))

7


100%|██████████████████████████████████████████| 7/7 [00:00<00:00, 51509.00it/s]

4





## Instruction Rewriting & Quality Verification.

Please perform self-instruct on the single atomic instructions and multi-instructions, then merge them together and place them in './sample_data/augment_instructions.txt'. Furthermore, for the augmented instructions, we generate verification functions and test samples through the supervision model and perform quality verification.


In [13]:
import random
import re
import os
import copy
import nltk
import numpy as np
import time
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
    RetryError
)
import logging
import signal
from tqdm import tqdm
import requests
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import jsonlines


"""Concat seed and augmented instructions for generation, then generation Eval funcs"""

seed_instructions = [each.strip() for each in open("./sample_data/seed_instruction.txt").readlines()]
augment_instructions_processed = [each.strip() for each in open("./sample_data/augment_instructions.txt").readlines()]


prompt_template = """You are an expert for writing evaluation functions in Python to evaluate whether a response strictly follows an instruction.
Here is the instruction: {instruction}
Please write a Python function named `evaluate` to evaluate whether an input string `response` follows this instruction. If it follows, simply return True, otherwise return False.
Please response with a single JSON includes the evaluation function in the key `func`, and a list of three test cases in the key `cases`, which includes an input in the key `input` and an expected output in the key `output` in (true, false).
Here is an example of output JSON format: {{"func": JSON_STR(use only \\n instead of \n), "cases": [{{"input": str, "output": str}}]}}."""


outputs = []
for instruction in seed_instructions + augment_instructions_processed:
    prompt = prompt_template.format(instruction=instruction)
    outputs.append({
        "prompt": prompt,
        "instruction": instruction
    })


with jsonlines.open("./output/eval_func_rft.jsonl", "w") as f:
    for each in outputs:
        f.write(each)


'''
Please TODO:

please generate K verification functions for each sample by supervision model in eval_func_rft.jsonl

'''

'\nPlease TODO:\n\nplease generate K verification functions for each sample by supervision model in eval_func_rft.jsonl\n\n'

In [14]:


random.seed(0)


# test gpt4

os.environ['NLTK_DATA'] = 'your nltk_data data path'
logging.getLogger('nltk').setLevel(logging.CRITICAL)
from nltk import data
data.path.append('your nltk_data data path')



# please merge single atom instruction set and multi instruction set to eval_func_rft.jsonl

path="./sample_data/eval_func_rft.jsonl"


results = list(jsonlines.open(path))


print("Preprocess vertification functions")


collect_packages = []
for result in results:
    res = result['gpt-answer']
    eval_funcs, test_cases = [], []
    for each in res:
        try:
            json_dict = re.findall(r'```json(.*?)```', each, re.DOTALL)[0].strip()
        except IndexError:
            continue
    
    # func rejection
    try:
        res_dict = json.loads(json_dict)
    except json.JSONDecodeError:
        continue
        
    func = res_dict['func']
    
    if '\\n' in func:
        func = func.replace('\\n', '\n')
    try:
        exec(func)
    except Exception:
        continue
    
    for line in func.split('\n'):
        if 'import' in line or 'download' in line or 'requests' in line:
            collect_packages.append(line)
print(list(set(collect_packages)))





print("cross validation for functions and cases")

def timeout_handler(signum, frame):
    raise TimeoutError("Function execution timed out")

filter_results = []
for result in tqdm(results):
    res = result['gpt-answer']
    eval_funcs, test_cases = [], []
    for each in tqdm(res):
        try:
            json_dict = re.findall(r'```json(.*?)```', each, re.DOTALL)[0].strip()
        except IndexError:
            continue

        try:
            res_dict = json.loads(json_dict)
        except json.JSONDecodeError:
            continue

        # func rejection
        func = res_dict['func']
        func = func.strip()
        func = '\n'.join([each for each in func.split('\n') if 'download' not in each and 'requests' not in each])
        try:
            exec(func)
        except Exception:
            continue
        eval_funcs.append(func)

        for each in res_dict['cases']:
            try:
                test_cases.append((each['input'], each['output']))
            except KeyError:
                print(each)
    eval_funcs = list(set(eval_funcs))
    test_cases = list(map(json.loads, set(map(json.dumps, test_cases))))
    if len(eval_funcs) < 3 or len(test_cases) < 10:
        continue

    filtered_test_cases = []

    for each in tqdm(test_cases):
  

        flag = False
        for func in eval_funcs:
            local_vars = {}
 
            try:
                exec(func, globals(), local_vars)
            except Exception:
                continue
            
            if 'evaluate' not in local_vars:
                continue
            eval_func = local_vars['evaluate']
            try:
                signal.signal(signal.SIGALRM, timeout_handler)
                signal.alarm(5)
                res = eval_func(each[0])
            except Exception:
                res = None
            finally:
                signal.alarm(0)
            if res is not None and res == each[1]:
                flag = True
        if flag:
            filtered_test_cases.append(each)

    scored_funcs = []
    for func in tqdm(eval_funcs):
        local_vars = {}
        try:
            exec(func, globals(), local_vars)
        except Exception:
                continue
        if 'evaluate' not in local_vars:
            continue

        eval_func = local_vars['evaluate']
        acc = []
        for inp, out in filtered_test_cases:
            try:
                signal.signal(signal.SIGALRM, timeout_handler)
                signal.alarm(5)
                res = eval_func(inp)
            except Exception:
                res = None
            finally:
                signal.alarm(0)
            if res is None or res != out:
                acc.append(0)
            else:
                acc.append(1)
        acc = np.mean(acc) if acc else 0
        scored_funcs.append((func, acc))

    valid_funcs = [each for each in scored_funcs if each[1] >= 0.8]
    if not valid_funcs:
        continue

    filter_results.append({
        "instruction": result['instruction'],
        "eval_func": valid_funcs,
        "cases": filtered_test_cases
    })
    

print("finish!!!")

with jsonlines.open("./output/cross_validation.jsonl", "w") as f:
    for each in filter_results:
        f.write(each)

Preprocess vertification functions
[]
cross validation for functions and cases


  0%|                                                     | 0/5 [00:00<?, ?it/s]
100%|███████████████████████████████████████████| 8/8 [00:00<00:00, 4929.40it/s][A

100%|█████████████████████████████████████████| 23/23 [00:00<00:00, 1890.21it/s][A

100%|███████████████████████████████████████████| 8/8 [00:00<00:00, 3678.00it/s][A

100%|███████████████████████████████████████████| 8/8 [00:00<00:00, 3542.49it/s][A

100%|█████████████████████████████████████████| 18/18 [00:00<00:00, 1889.42it/s][A

100%|███████████████████████████████████████████| 6/6 [00:00<00:00, 8873.70it/s][A

100%|██████████████████████████████████████████| 8/8 [00:00<00:00, 17163.39it/s][A

100%|██████████████████████████████████████████| 24/24 [00:00<00:00, 604.96it/s][A

100%|███████████████████████████████████████████| 8/8 [00:00<00:00, 6733.78it/s][A
 60%|███████████████████████████                  | 3/5 [00:00<00:00, 21.19it/s]
100%|██████████████████████████████████████████| 8/8 [00:00<00:00, 17503.6

finish!!!





# Scalable Instruction-Query Synthesis

## Random Instruction-Query Combination. 
We randomly compose the instruction-constrained querysets from these two domains: General and RAG. For general domain, we use the ShareGPT (20K). For RAG domain, we use the training sets from Natural Questions, TriviaQA, HotpotQA, and WebQuestionsSP as mixed QA sources, constructing.

RAG dataset: https://huggingface.co/datasets/dongguanting/RAG-QA-40K

General: https://huggingface.co/datasets/dongguanting/ShareGPT-12K

## Instruction-Query Rejection Sampling.

After combination, we can perform the RFT for instruction-query samples.


In [15]:
import jsonlines
import json
import random
import re
import os
import copy
import nltk
import numpy as np
import time
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
    RetryError
)
import logging
import signal
from tqdm import tqdm
import requests
from concurrent.futures import ThreadPoolExecutor, TimeoutError


random.seed(0)


def load_json(file_path):
    with open(file_path, 'r') as file:
        return json.load(file)




filter_results=[]




with jsonlines.open("./sample_data/cross_validation.jsonl", "r") as f:
    for each in f:
        filter_results.append(each)



sft_data = list(jsonlines.open("your path to sharegot dataset"))
queries = [each['messages'][1]['content'] for each in sft_data if each['source'] == 'en:sharegpt']

queries = [each for each in queries if len(each) > 20 and len(each) < 300]



#General dataset

inputs = []
for instruction in tqdm(filter_results):
    ins_queries = random.sample(queries, 16) #拼16个
    for q in ins_queries:
        prompt = f"Please answer the query strictly following the instruction.\n[instruction] {instruction['instruction']}\n[Query] {q}"
        item = copy.deepcopy(instruction)
        item['prompt'] = prompt
        inputs.append(item)



#RAG dataset

data = load_json("your path to RAG train dataset")


queries=[]

queries = [each["instruction"] for each in data]


for instruction in tqdm(filter_results):
    ins_queries = random.sample(queries, 4) #拼16个
    for q in ins_queries:
        prompt = f"Please answer the query strictly following the instruction.\n[instruction] {instruction['instruction']}\n[Query] {q}"
        item = copy.deepcopy(instruction)
        item['prompt'] = prompt
        inputs.append(item)





with jsonlines.open("./output/instruction_filtered_llama3_72b_query_sampled.jsonl", "w") as f:
    for each in inputs:
        f.write(each)


        
'''
Please TODO:

Please use supervision model perform RFT to generate k Responses for each query
'''

FileNotFoundError: [Errno 2] No such file or directory: 'your path to sharegpt dataset'


## Dual Stage Verification. 

we employ "Executor-based Verification" and "Consistency Verification" process for the instruction-query data.


In [16]:
import jsonlines
import json
import random
import re
import os
import copy
import nltk
import numpy as np
import time
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
    RetryError
)
import logging
import signal
from tqdm import tqdm
import requests
from concurrent.futures import ThreadPoolExecutor, TimeoutError


# 1. Executor-based Verification

random.seed(0)



def timeout_handler(signum, frame):
    raise TimeoutError("Function execution timed out")

results = list(jsonlines.open("./sample_data/query_rft.jsonl"))

filter_samples = []
for result in tqdm(results):
    eval_funcs = []


    for func, score in result['eval_func']:
        local_vars = {}
        exec(func, globals(), local_vars)
        eval_funcs.append(local_vars['evaluate'])
    
    filter_responses = []

    for response in result['gpt-answer']:
        acc = []
        for eval_func in eval_funcs:
            try:
                signal.signal(signal.SIGALRM, timeout_handler)
                signal.alarm(5)
                res = eval_func(response)
            except Exception as e: 
                print(e)
                res = None
            finally:
                signal.alarm(0)
            
            if res is not None:
                try:
                    acc.append(int(res))
                except:
                    continue
        acc = np.mean(acc) if acc else 0


        if acc > 0:
            filter_responses.append(response)

    for each in filter_responses:
        try:
            filter_samples.append({
                'instruction': result['instruction'],
                'query': re.findall(r'\[Query\](.*)$', result['prompt'], re.DOTALL)[0].strip(),
                'response': each
            })
        except IndexError:
            print(result['prompt'])


print(len(eval_funcs))
print(len(filter_samples))
filter_samples = list(map(json.loads, set(map(json.dumps, filter_samples))))
print(len(filter_samples))


# Save the data with out score consistency
with jsonlines.open("./output/query_wo_score.jsonl", "w") as f:
    for each in filter_samples:
        f.write(each)

prompt_template = """You are an expert that is good at judging whether a response is following the instruction and query.
[Instruction] {instruction}
[Query] {query}
[Response] {response}
Please notice that the response may not be helpful as it needs to strictly follow the requirements in the Instruction.
You need to judge whether the response answers the query. Please first provide a detailed analysis and then give a score ranking from 0 to 10 at the last line.
Scoring 0 means the response is totally unrelated to the query, while scoring 10 means the response is helpful and highly related to the query.
Please only provide a score in the format `Score: {{score}}` without any other contents at the last line."""

for each in filter_samples:
    each['prompt'] = prompt_template.format(
        instruction=each['instruction'],
        query=each['query'],
        response=each['response']
    )

# Save the data with out scoring prompt
with jsonlines.open("./output/query_need_quality_score.jsonl", "w") as f:
    for each in filter_samples:
        f.write(each)


'''
Please TODO:

Please score the consistency for each query and response
'''

100%|██████████████████████████████████████████| 20/20 [00:00<00:00, 566.92it/s]

name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_syllables' is not defined
name 'count_




'\nPlease TODO:\n\nPlease score the consistency for each query and response\n'

In [17]:
# 2. Consistency Verification

results = list(jsonlines.open("./sample_data/query_rft_score.jsonl"))
filter_results = []
print(len(results))
for result in tqdm(results):
    scores = []
    for each in result['gen']:
        score = re.findall(r'Score: (\d+?)$', each)
        if score:
            scores.append(int(score[0]))
    score = np.mean(scores) if scores else 0
    if score > 8: # quality score
        filter_results.append(result)
print(len(filter_results))



with jsonlines.open("./output/query_score_filter.jsonl", "w") as f:
    for each in filter_results:
        f.write(each)


20


100%|████████████████████████████████████████| 20/20 [00:00<00:00, 21029.35it/s]

16





## SFT data construction

Finally, we fliter out the sample with score > 8 and save it into LlaMA-Factory's SFT data format.

In [18]:
import json

data = []


with open('./sample_data/query_score_filter.jsonl', 'r', encoding='utf-8') as file:
    
    for dat in file:
        d = json.loads(dat)
        data.append(d)


processed_data = []
for item in data:
    
    item['query'] = item['query'][0].upper() + item['query'][1:]
    item['instruction'] = item['instruction'][0].upper()+ item['instruction'][1:]
    if "?" in item['query']:
        inputs = item['query']+" "+item['instruction']+"."

    elif "." in item['query']:
        inputs = item['query']+" "+item['instruction']+"."
    else:
        inputs=item['query']+". "+item['instruction']+"."


    new_item = {
        "instruction": inputs,
        "input": "",
        "output": item['response'],
        "history": []
    }


    processed_data.append(new_item)
print(len(processed_data))



#Save the SFT data as llama factory format

with open('./output/IF_sft_data.json', 'w', encoding='utf-8') as outfile:
    json.dump(processed_data, outfile, indent=4, ensure_ascii=False)


16
