#### Demo for generation

In [None]:
# -*- coding: utf-8 -*-  
from openai import OpenAI
import time
import requests
import json
import os

# Path to the system_prompt
system_prompt = open('/path/to/language.txt').read()
print(system_prompt)

# Use LLM API
url = "****************"

headers = {
    "Content-Type": "*****************",
    "Authorization": "****************"
}

topic_list = []

# Path to the dialogue topic
with open('/path/to/topic.txt','r',encoding='utf-8') as input_file:
    topic_list = input_file.readlines()

topic_list = [item.strip() for item in topic_list]
topic_list = list(set(topic_list))

path = '/path/to/result/language'
os.makedirs(path, exist_ok=True)


In [None]:
import random
import json
import re
from tqdm import tqdm

# Randomly obtain the number of turns of dialogue for this session.
def random_number():
  rand_num = random.random()
  if rand_num < 0.3:
    return 4
  elif rand_num < 0.65:
    return 6
  else:
    return 8

# Get the dialogue data
def clean_and_parse_json(input_str):
    match = re.search(r'\{.*?\}', input_str, re.DOTALL)
    if match:
        # Fix some common writing errors
        json_str = match.group(0)  
        json_str = json_str.replace('history turns','history_turns')
        json_str = json_str.replace('<A>','A').replace('[A]','A').replace('<B>','B').replace('[B]','B')
        json_str = re.sub(r',\s*}', '}', json_str)
        json_str = json_str.replace("\xa0", "")
        json_str = json_str.replace('higher','high').replace('slower','slow')
        json_str = json_str.replace('Speaker A','A').replace('Speaker B','B')
        json_str = json_str.replace(", excited,", ", happy,")
        json_str = json_str.replace(", thoughtful,",", neutral,")
        json_str = json_str.replace(", curious,",", neutral,")
        json_str = json_str.replace(", shocked,",", surprised,")
        json_str = json_str.replace(", scared,",", fearful,")
        json_str = json_str.replace(", nervous,",", disgusted,")
        json_str = json_str.replace(", concerned,",", neutral,")
        json_str = json_str.replace("response_of_current_style_style_1","response_of_current_style_1")
        json_str = json_str.replace("response_of_current_style_style_2","response_of_current_style_2")
        json_str = json_str.replace("response_of_current_style_style_3","response_of_current_style_3")
        json_str = json_str.replace('“', '"').replace('”', '"') 
        
        # Check the dialogue format
        try:
            json_data = json.loads(json_str)
            if len(json_data.get("history_turns", [])) % 2 != 0:
                return None
            if len(json_data.get("history_turns", [])) < 4:
                return None
            if check_content(json_data) == 0:
                return None
            if check_json_emotion(json_data) == 0:
                return None
            if check_json_other(json_data) == 0:
                return None
            json_data = fix_dialogue_json_speaker(json_data)
            if check_json_speaker(json_data) == 0:
                return None
            json_data = fix_dialogue_json_age(json_data)
            current_speaker = json_data["current_turn"].split(":")[0].strip()
            for i in range(1, 4):
                key = f"response_of_current_style_{i}"
                speaker = 'B' if current_speaker=='A' else 'A'
                json_data[key] = speaker + json_data[key][1:]
            json_data = fix_dialogue_json_gender(json_data)         
            for key, value in json_data.items():
                if isinstance(value, str):
                    json_data[key] = value.replace("  ", " ")       
            return json_data
        except json.JSONDecodeError as e:
            print(e)
            return None
    else:
        return None

def fix_dialogue_json_speaker(data):
    json_data = data
    first_speaker = 'A'
    second_speaker = 'B'
    value_list = json_data["history_turns"]
    for k in range(len(value_list)):
        if k % 2 == 0:
            current_speaker = 'A'
        else:
            current_speaker = 'B'
        value = value_list[k]
        second_part = "(".join(value.split("(")[1:])
        value_list[k] = "(".join([current_speaker + ' ', second_part])
        

    # Fix speaker in current_turn
    value = json_data["current_turn"]
    second_part = ":".join(value.split(":")[1:])
    json_data["current_turn"] = ":".join([first_speaker, second_part])


    # Fix speaker in response_of_current_style_i
    for i in range(1, 4):
        key = f"response_of_current_style_{i}"
        value = json_data[key]
        second_part = "(".join(value.split("(")[1:])
        json_data[key] = "(".join([second_speaker+' ', second_part])
    return json_data

def fix_dialogue_json_age(data):
    json_data = data
    first_speaker = json_data["history_turns"][0].split(')')[0].split(',')[-1].strip()
    second_speaker = json_data["history_turns"][1].split(')')[0].split(',')[-1].strip()

    for i in range(1, 4):
        key = f"current_turn_style_{i}"
        value = json_data[key]
        parts = value.split(", ")
        parts[-1] = f"{first_speaker})"
        new_str = ", ".join(parts)
        json_data[key] = new_str.replace("  "," ")
        
    for i in range(1, 4):
        key = f"response_of_current_style_{i}"
        value = json_data[key]
        start_idx = value.find('(')
        end_idx = value.find(')', start_idx)
        inner_content = value[start_idx+1:end_idx]
        parts = inner_content.split(", ")
        parts[-1] = second_speaker
        new_inner_content = ", ".join(parts)
        new_str = value[:start_idx+1] + new_inner_content + value[end_idx:]
        json_data[key] = new_str.replace("  "," ")
        
    return json_data

def fix_dialogue_json_gender(data):
    json_data = data
    first_gender = json_data["history_turns"][0].strip().split("(")[1].strip().split(",")[0].strip()
    second_gender = json_data["history_turns"][1].strip().split("(")[1].strip().split(",")[0].strip()

    # Fix speaker's gender in current_turn_style_i
    for i in range(1, 4):
        key = f"current_turn_style_{i}"
        style_parts = json_data[key][1:-1].strip().split(",")
        style_parts[0] = first_gender
        json_data[key] = f"({', '.join(style_parts)})"

    # Fix speaker's gender in response_of_current_style_i
    for i in range(1, 4):
        key = f"response_of_current_style_{i}"
        if second_gender=='male':
            json_data[key] = json_data[key].replace('female','male')
        else:
            json_data[key] = re.sub(r'\bmale\b', 'female', json_data[key])

    return json_data

def check_content(data):
    content_list = data["current_turn"].split(':')
    content_list = [item.strip() for item in content_list]
    if len(content_list) == 0:
        return 0
    return 1
    
def check_json_speaker(data):
    current_speaker = data["current_turn"].split(":")[0].strip()
    first_speaker = data["history_turns"][0].strip()[0]
    return current_speaker==first_speaker

def check_json_emotion(data):
    def check_emotion(text):
        start = text.find("(") + 1
        end = text.find(")")
        if start == 0 or end == -1:
            return False
        tuple_str = text[start:end]
        emotion = tuple_str.strip().split(",")[1].strip()
        return emotion in target_list
    
    target_list = ["neutral", "happy", "angry", "sad", "surprised", "fearful", "disgusted"]
    for key, value in data.items():
        #try:
        if isinstance(value, str) and key!='current_turn':
            if check_emotion(value):
                continue
            else:
                return 0
        elif isinstance(value, list):
            for item in value:
                if check_emotion(item):
                    continue
                else:
                    return 0
    return 1

def check_json_other(data):
    def check_other(text):
        start = text.find("(") + 1
        end = text.find(")")
        tuple_str = text[start:end]
        gender = tuple_str.strip().split(',')[0].strip()
        speed = tuple_str.strip().split(',')[2].strip()
        flag = gender in gender_list and speed in speed_list
        return flag
    
    gender_list = ["male","female"]
    speed_list = ["slow", "normal", "fast"]
    
    for key, value in data.items():
        if isinstance(value, str) and key!='current_turn':
            if check_other(value):
                continue
            else:
                return 0
        elif isinstance(value, list):
            for item in value:
                if check_other(item):
                    continue
                else:
                    return 0
    return 1

In [None]:
# Demo for the generation
topic = topic_list[0]
number_turn = random_number()

system_prompt = system_prompt.replace('[[[number of turns]]]',f'{number_turn}')
user_prompt = '''
    对话发生在场景[[[topic use]]]中，请自由想象在这个[[[topic use]]]场景中可能发生的事件和对话。你可以自己决定两个说话者的身份和他们的关系，请保持对话的自然流畅。
    '''.replace('[[topic use]]', f'{topic}').strip()
    
data = {
    "model": "gpt-4o-mini",
    "temperature": 1.4,
    "top_p": 1,
    "presence_penalty": 0,
    "frequency_penalty": 0,
    "n": 50,
    "messages": [
        {
            "role": "system",
            "content": system_prompt
        },
        {
            "role": "user",
            "content": user_prompt
        }
    ]
}
response = requests.post(url, headers=headers, data=json.dumps(data).encode('utf-8') )
time.sleep(0.5)
result = response.content.decode("utf-8")
result = json.loads(result)
        
result_example = []
for i in range(len(result["choices"])):
    input_str =  result["choices"][i]["message"]["content"]
    try:
        json_data = clean_and_parse_json(input_str)
        if json_data is not None:  
            result_example.append(json_data)
    except Exception as e:
        continue  
