In [1]:
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    SearchIndex,
    SimpleField,
    SearchableField,
    SearchField,
    SearchFieldDataType,
    VectorSearch,
    HnswAlgorithmConfiguration,
    VectorSearchProfile,
)
from azure.search.documents.models import VectorizedQuery
import json
import os
import glob
import csv
import re
import MeCab
import uuid
from jinja2 import Environment, FileSystemLoader
from openai import OpenAI
import tiktoken
from azure.search.documents.models import VectorizedQuery

load_dotenv()

SEARCH_ENDPOINT = os.getenv("SEARCH_ENDPOINT")
SEARCH_KEY = os.getenv("SEARCH_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")  
OPENAI_MODEL_NAME_JUDGE = "gpt-4o-mini"
OPENAI_MODEL_NAME_ANS = "gpt-4o"
OPENAI_EMB_NAME = "text-embedding-3-large"
SEARCH_INDEX_NAME="rag1_index"


credential = DefaultAzureCredential()
search_credential = AzureKeyCredential(SEARCH_KEY)
openai_client = OpenAI(api_key=OPENAI_API_KEY) 
index_client = SearchIndexClient(endpoint=SEARCH_ENDPOINT, credential=search_credential)
jinja_env = Environment(loader=FileSystemLoader('prompts'))


# 1.インデックス作成

以下のコードでインデックスを２種類作成する。
- chunk_size:2056, overlap:10のインデックス : rag-1_index
- chunk_size:512, overlap:0のインデックス : rag-1_index2

In [None]:
def create_search_index():

    fields = [
        SimpleField(name="id", type=SearchFieldDataType.String, key=True),
        SearchableField(name="title", type=SearchFieldDataType.String, filterable=True),
        SearchableField(name="author", type=SearchFieldDataType.String, filterable=True),
        SearchableField(name="content", type=SearchFieldDataType.String),
        SearchableField(name="content_trans", type=SearchFieldDataType.String),
        SearchField(name="content_vector", type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
                    searchable=True, vector_search_dimensions=3072, vector_search_profile_name="vector-profile"),
        SearchableField(name="keywords", type=SearchFieldDataType.String),
    ]

    vector_search = create_vector_search()

    index = SearchIndex(
        name=SEARCH_INDEX_NAME,
        fields=fields,
        vector_search=vector_search,
    )

    try:
        result = index_client.create_or_update_index(index)
        print(f"Index created or updated: {result.name}")
        return result
    except Exception as e:
        print(f"Error creating index: {str(e)}")
        print(f"Index definition: {index.serialize()}")
        raise


def create_vector_search(
        algorithm_name="my-algorithms-config",
        vector_search_profile_name="vector-profile"
        ):
    vector_search = VectorSearch(
        profiles=[
            VectorSearchProfile(
                name=vector_search_profile_name,
                algorithm_configuration_name=algorithm_name,
            )
        ],
        algorithms=[HnswAlgorithmConfiguration(name=algorithm_name)],
    )
    return vector_search

def chunk_text(text, max_chunk_size=512, overlap_sentences=0):
    lines = text.split('\n')
    title = lines[0].strip()
    author = lines[1].strip()
    content = '\n'.join(lines[2:])
    sentences = content.replace('。', '。\n').split('\n')
    sentences = [s.strip() for s in sentences if s.strip()]
    
    chunks = []
    current_chunk = ""
    overlap_buffer = []
    
    for sentence in sentences:
        if len(current_chunk) + len(sentence) > max_chunk_size and current_chunk:
            chunks.append(current_chunk.strip())
            overlap_buffer = overlap_buffer[-overlap_sentences:]
            current_chunk = ''.join(overlap_buffer)
        
        current_chunk += sentence
        overlap_buffer.append(sentence)

        if len(overlap_buffer) > overlap_sentences:
            overlap_buffer.pop(0)
    if current_chunk:
        chunks.append(current_chunk.strip())
    
    return title, author, chunks


def translate_chunks(chunks):
    template = jinja_env.get_template('translate.jinja')
    prompt = template.render(chunk=chunks)

    response = openai_client.chat.completions.create(
        model=OPENAI_MODEL_NAME_JUDGE,
        messages=[
            {"role": "system", "content": prompt},
            {"role": "user", "content": "指示を実行してください。"}
        ]
    )
    translated_result = response.choices[0].message.content
    
    return translated_result

def extract_keywords(text):
    tagger = MeCab.Tagger()

    text = re.sub(r'《.*?》', '', text)
    text = re.sub(r'［＃.*?］', '', text)
    text = re.sub(r'｜', '', text)

    node = tagger.parseToNode(text)

    words = []
    while node:
        features = node.feature.split(',')
        if features[0] == '名詞' :
            words.append(node.surface)
        node = node.next

    # 重複を除去し、出現順序を保持
    unique_keywords = []
    seen = set()
    for keyword in words:
        if keyword not in seen:
            unique_keywords.append(keyword)
            seen.add(keyword)

    return ', '.join(unique_keywords)

def get_embedding(text):
    response = openai_client.embeddings.create(
        input=text,
        model="text-embedding-3-large" 
    )
    return response.data[0].embedding

def process_and_register_document(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    
    title, author, chunks = chunk_text(text)
    documents = []
    for i, chunk in enumerate(chunks):
        keywords = extract_keywords(chunk)
        translated_result = translate_chunks(chunk)
        result_vec =" ".join([chunk, translated_result,keywords])
        vector = get_embedding(result_vec)

        document = {
            "id": f"{str(uuid.uuid4())}_chunk_{i+1}",
            "title": title,
            "author": author,
            "content": chunk,
            "content_trans": translated_result,
            "content_vector": vector,
            "keywords": keywords,
        }
        documents.append(document)
        
        print(str(uuid.uuid4())+f"_chunk_{i+1}:")
        print(f"  Title: {title}")
        print(f"  Author: {author}")
        print(f"  Content: {chunk[:50]}...")
        print(f"  Content_Translated: {translated_result[:50]}...")
        print(f"  Vector (first 5 elements): {vector[:5]}")
        print(f"  Keywords: {keywords[:10]}...")
        print("--" * 10)

    result = search_client.upload_documents(documents)
    print(f"Uploaded {len(result)} documents from {file_path}")
    return len(documents)

def main():
    create_search_index()
    data_folder = "./data"
    total_chunks = 0
    
    for file_path in glob.glob(os.path.join(data_folder, "*.txt")):
        num_chunks = process_and_register_document(file_path)
        total_chunks += num_chunks
    
    print(f"Total number of chunks registered: {total_chunks}")

if __name__ == "__main__":
    main()

# 2. RAG回答作成

In [None]:
def truncate_to_token_limit(text: str, limit: int) -> str:
    encoding = tiktoken.encoding_for_model("gpt-4o")
    tokens = encoding.encode(text)
    if len(tokens) <= limit:
        return text
    return encoding.decode(tokens[:limit])

def read_query_csv(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        return list(reader)

def get_embedding(text):
    response = openai_client.embeddings.create(
        input=text,
        model=OPENAI_EMB_NAME
    )
    return response.data[0].embedding

def hybrid_search(problem, query, filter_title, top_k, query_type):
    if query_type in ["計算型", "グローバル文脈解釈型"]:
        search_client = SearchClient(endpoint=SEARCH_ENDPOINT, index_name="rag-1_index2", credential=search_credential)
    elif query_type in ["選択肢型", "ローカル文脈解釈型", "人物・モノ列挙型"]:
        search_client = SearchClient(endpoint=SEARCH_ENDPOINT, index_name="rag-1_index", credential=search_credential)
    else:
        raise ValueError("Invalid query_type provided.")

    combined_query = f"{query}{problem} "
    query_vector = get_embedding(combined_query)
    
    vector_query = VectorizedQuery(
        vector=query_vector,
        k_nearest_neighbors=top_k,
        fields="content_vector"
    )

    keyword_query = f"{query}^3{problem} "

    filter_expression = f"search.in(title, '{filter_title}')" if filter_title != "不明" else None

    results = search_client.search(
        search_text=keyword_query,
        vector_queries=[vector_query],
        filter=filter_expression,
        select=["id","title", "author", "content", "content_trans", "keywords"],
        top=top_k,
    )

    search_results = list(results)

    filtered_results = [
        {
            "content": result["content"],
        } for result in search_results
    ]

    for i, content in enumerate(filtered_results, 1):
        print(f"Result {i}:")
        print(f"Content: {content}")  
        print("\n") 

    return filtered_results

def query_intent(query,title):
    template = jinja_env.get_template("query_intent.jinja")

    system_message =template.render(query=query,title=title)  

    functions = [
        {
            "name": "provide_intent",
            "description": "質問の意図を抽出する",
            "parameters": {
                "type": "object",
                "properties": {
                    "intent": {
                        "type": "string",
                        "description": "質問意図"
                    }
                },
                "required": ["intent"]
            }
        }
    ]
    
    response = openai_client.chat.completions.create(
        model=OPENAI_MODEL_NAME_ANS,
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": "指示を実行する"}
        ],
        functions=functions,
        function_call={"name": "provide_intent"},
        temperature=0
    )
    
    function_call = response.choices[0].message.function_call
    result = json.loads(function_call.arguments)
    
    return result["intent"]

def generate_answer_draft(context, query, title, cls):
    # clsに基づいて適切なテンプレートを選択
    template_map = {
        "キーワード探索型": "answer_process_q1.jinja",
        "グローバル文脈解釈型": "answer_process_q2.jinja",
        "ローカル文脈解釈型": "answer_process_q2.jinja",
        "計算型": "answer_process_q3.jinja",
        "選択肢型": "answer_process_q4.jinja",
        "人物・モノ列挙型": "answer_process_q5.jinja"
    }
    
    template_name = template_map.get(cls)
    template = jinja_env.get_template(template_name)

    context = json.dumps(context, ensure_ascii=False)

    if template_name == "answer_process_q1.jinja":
        system_message = template.render(
            title=title,
            query=query
        )
    else:
        system_message = template.render(
            context=context,
            query=query
        )

    response = openai_client.chat.completions.create(
        model=OPENAI_MODEL_NAME_ANS,
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": "指示を実行してください。"}
         ],
        temperature=0
    )

    answer_draft = response.choices[0].message.content
 
    print(answer_draft)
    
    return answer_draft

def generate_answer(answer_draft):
    template = jinja_env.get_template("answer_generate.jinja")

    system_message ="あなたは小説に関する質問に対して正確な回答をするAIアシスタントです。"
    user_message =template.render(answer_draft=answer_draft)

    functions = [
        {
            "name": "provide_answer_and_evidence",
            "description": "回答と根拠を提供する",
            "parameters": {
                "type": "object",
                "properties": {
                    "answer": {
                        "type": "string",
                        "description": "回答(50トークン以内)"
                    },
                    "evidence": {
                        "type": "string",
                        "description": "根拠（50トークン以内）"
                    }
                },
                "required": ["answer", "evidence"]
            }
        }
    ]
    
    response = openai_client.chat.completions.create(
        model=OPENAI_MODEL_NAME_ANS,
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message}
        ],
        functions=functions,
        function_call={"name": "provide_answer_and_evidence"},
        temperature=1
    )
    
    function_call = response.choices[0].message.function_call
    result = json.loads(function_call.arguments)
    result["answer"] = truncate_to_token_limit(result["answer"], 50)
    result["evidence"] = truncate_to_token_limit(result["evidence"], 50)
    
    print(result)
    print("---" * 30)
    
    return result["answer"], result["evidence"]

def _count_keyword_in_file(file_path, keyword_pattern):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            text = file.read()

        if keyword_pattern.startswith("r'") and keyword_pattern.endswith("'"):
            keyword_pattern = keyword_pattern[2:-1]

        count = len(re.findall(keyword_pattern, text))
        result = {
            "answer": f"{count} 回",
            "evidence": "文章検索の結果より"
        }
        return result
    except FileNotFoundError:
        print(f"ファイル {file_path} が見つかりませんでした。")
        return None
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return None

def search_query_answer(query):
    system_message=""""
    #役割
    あなたは質問の中の検索キーワードを抜き出すプロフェッショナルです。
    
    #指示
    - 質問を解釈し、検索すべきキーワードを抜き出してください。
    - 文字列の出現回数はとはないといった場合は正規表現としてください。
    
    #例
    質問 : 「田中」という名前は何回登場しますか？
    抽出キーワード：田中
    
    質問: 「おお…」（おの回数は問わない）という声が登場する回数は何回ですか？
    抽出キーワード : r'(おお)+'
    
    #Tools
    keywordの変数に抽出キーワードを格納する
    """

    functions = [
        {
            "name": "provide_keyword",
            "description": "質問内の検索キーワードを抽出する",
            "parameters": {
                "type": "object",
                "properties": {
                    "keyword": {
                        "type": "string",
                        "description": "検索の抽出キーワード"
                    }
                },
                "required": ["keyword"]
            }
        }
    ]
    
    response = openai_client.chat.completions.create(
        model=OPENAI_MODEL_NAME_ANS,
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": f"#質問 : {query}"}
        ],
        functions=functions,
        function_call={"name": "provide_keyword"},
        temperature=0
    )
    
    function_call = response.choices[0].message.function_call
    keyword = json.loads(function_call.arguments)['keyword']
    print(keyword)
    
    result = _count_keyword_in_file("./data/combined/all.txt", keyword)

    if result is None:
        return "検索結果が見つかりませんでした。", "エラーが発生したか、ファイルが見つかりませんでした。"
    else:
        print(f'回答:{result["answer"]}  根拠:{result["evidence"]}')
        print("---" * 30)
        return result["answer"], result["evidence"]

def judge_title(query):
    template = jinja_env.get_template('judge_title.jinja')
    system_message = template.render()
    user_message = f"質問: {query}"
    
    functions = [{
        "name": "provide_title",
        "description": "小説タイトルを提供する",
        "parameters": {
            "type": "object",
            "properties": {
                "title": {"type": "string", "description": '判断された小説タイトル ["不如帰","流行暗殺節","カインの末裔","競漕","芽生","サーカスの怪人","死生に関するいくつかの断想","不明"]'}
            },
            "required": ["title"]
        }
    }]
    
    response = openai_client.chat.completions.create(
        model=OPENAI_MODEL_NAME_ANS,
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message}
        ],
        functions=functions,
        function_call={"name": "provide_title"},
        temperature=0
    )
    
    result = json.loads(response.choices[0].message.function_call.arguments)
    return result["title"]

def judge_query(query):
    template = jinja_env.get_template('judge_query.jinja')
    system_message = template.render()
    user_message = f"質問: {query}"
    
    functions = [{
        "name": "judge_query",
        "description": "質問の分類型を提供する",
        "parameters": {
            "type": "object",
            "properties": {
                "cls": {"type": "string", "description": '判断された質問の種類 ["選択肢型"," 計算型","グローバル文脈解釈型","ローカル文脈解釈型","キーワード探索型","キーワード特定型","人物・モノ列挙型"]'}
            },
            "required": ["cls"]
        }
    }]
    
    response = openai_client.chat.completions.create(
        model=OPENAI_MODEL_NAME_ANS,
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message}
        ],
        functions=functions,
        function_call={"name": "judge_query"},
        temperature=0
    )
    
    result = json.loads(response.choices[0].message.function_call.arguments)
    return result["cls"]

def process_queries(queries, trial_num):
    results = []
    for query in queries:
        index = query['index']
        problem = query['problem']
        title = judge_title(problem)
        cls = judge_query(problem)
        

        problem_intent=query_intent(query=problem,title=title)

        print(problem_intent)
        if cls=="キーワード特定型":
            print(f"TYPE1:{cls}")
            answer,evidence=search_query_answer(query)

        elif cls=="キーワード探索型":
            print(f"TYPE2:{cls}")
            answer_draft = generate_answer_draft(context="", query=problem, title=title,cls=cls)
            answer,evidence= generate_answer(answer_draft)

        elif cls in ["計算型", "グローバル文脈解釈型"]:
            print(f"TYPE3:{cls}")
            context = hybrid_search(problem=problem, query=problem_intent, filter_title=title, top_k=10, query_type=cls)
            answer_draft = generate_answer_draft(context=context, query=problem, title=title,cls=cls)
            answer,evidence = generate_answer(answer_draft)

        else:
            print(f"TYPE4:{cls}")
            context = hybrid_search(problem=problem, query=problem_intent, filter_title=title, top_k=4, query_type=cls)
            answer_draft = generate_answer_draft(context=context, query=problem, title=title,cls=cls)
            answer,evidence = generate_answer(answer_draft)


        results.append([index, answer, evidence])
        
    output_file = f"../submit/submit_{trial_num}.csv"
    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerows(results)

def main():
    trial_num = "001"
    query_file = "../input/query.csv"
    
    queries = read_query_csv(query_file)
    process_queries(queries, trial_num)

if __name__ == "__main__":
    main()

# 3. submit用zipファイルの作成

In [None]:
import os
import shutil
import zipfile

def create_submission_zip_from_csv(input_csv, output_zip="submission.zip"):
    # Create a temporary directory for the submission structure
    temp_dir = "temp_submit"
    os.makedirs(temp_dir, exist_ok=True)

    # Copy the input CSV file to the temporary directory and rename it to predictions.csv
    shutil.copy(input_csv, os.path.join(temp_dir, "predictions.csv"))

    # Create a ZIP file
    with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, dirs, files in os.walk(temp_dir):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, temp_dir)
                zipf.write(file_path, arcname)

    # Remove the temporary directory
    shutil.rmtree(temp_dir)

    print(f"Submission ZIP file created: {output_zip}")

# Usage example
if __name__ == "__main__":
    input_csv_file = "../submit/submit_001.csv"
    output_zip_file = "../submit/submission_001.zip"
    create_submission_zip_from_csv(input_csv_file, output_zip_file)